--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <boost/asio/append.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include <boost/intrusive/list.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include "common/async/cancel_on_error.h"
+#include "common/async/co_waiter.h"
+#include "common/async/service.h"
+#include "include/ceph_assert.h"
+
+namespace ceph::async::detail {
+
+// Coroutine throttle implementation. This is reference-counted so the
+// co_spawn() completion handlers can extend the implementation's lifetime.
+// This is required for per-op cancellation because the cancellation_signals
+// must outlive their coroutine frames.
+template <boost::asio::execution::executor Executor>
+class co_throttle_impl :
+ public boost::intrusive_ref_counter<co_throttle_impl<Executor>,
+ boost::thread_unsafe_counter>,
+ public service_list_base_hook
+{
+ public:
+ using executor_type = Executor;
+ executor_type get_executor() const { return ex; }
+
+ co_throttle_impl(const executor_type& ex, size_t limit,
+ cancel_on_error on_error)
+ : svc(boost::asio::use_service<service<co_throttle_impl>>(
+ boost::asio::query(ex, boost::asio::execution::context))),
+ ex(ex), limit(limit), on_error(on_error),
+ children(new child[limit])
+ {
+ // register for service_shutdown() notifications
+ svc.add(*this);
+
+ // initialize the free list
+ for (size_t i = 0; i < limit; i++) {
+ free.push_back(children[i]);
+ }
+ }
+ ~co_throttle_impl()
+ {
+ svc.remove(*this);
+ }
+
+ auto spawn(boost::asio::awaitable<void, executor_type> cr,
+ size_t smaller_limit)
+ -> boost::asio::awaitable<void, executor_type>
+ {
+ if (unreported_exception && on_error != cancel_on_error::none) {
+ std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+ }
+
+ const size_t current_limit = std::min(smaller_limit, limit);
+ if (count >= current_limit) {
+ co_await wait_for(current_limit - 1);
+ if (unreported_exception && on_error != cancel_on_error::none) {
+ std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+ }
+ }
+
+ ++count;
+
+ // move a free child to the outstanding list
+ ceph_assert(!free.empty());
+ child& c = free.front();
+ free.pop_front();
+ outstanding.push_back(c);
+
+ // spawn the coroutine with its associated cancellation signal
+ c.signal.emplace();
+ c.canceled = false;
+
+ boost::asio::co_spawn(get_executor(), std::move(cr),
+ boost::asio::bind_cancellation_slot(c.signal->slot(),
+ child_completion{this, c}));
+
+ if (unreported_exception) {
+ std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+ }
+ }
+
+ auto wait()
+ -> boost::asio::awaitable<void, executor_type>
+ {
+ if (count > 0) {
+ co_await wait_for(0);
+ }
+ if (unreported_exception) {
+ std::rethrow_exception(std::exchange(unreported_exception, nullptr));
+ }
+ }
+
+ void cancel()
+ {
+ while (!outstanding.empty()) {
+ child& c = outstanding.front();
+ outstanding.pop_front();
+
+ c.canceled = true;
+ c.signal->emit(boost::asio::cancellation_type::terminal);
+ }
+ }
+
+ void service_shutdown()
+ {
+ waiter.shutdown();
+ }
+
+ private:
+ service<co_throttle_impl>& svc;
+ executor_type ex;
+ const size_t limit;
+ const cancel_on_error on_error;
+
+ size_t count = 0;
+ size_t wait_for_count = 0;
+
+ std::exception_ptr unreported_exception;
+
+ // track each spawned coroutine for cancellation. these are stored in an
+ // array, and recycled after each use via the free list
+ struct child : boost::intrusive::list_base_hook<> {
+ std::optional<boost::asio::cancellation_signal> signal;
+ bool canceled = false;
+ };
+ std::unique_ptr<child[]> children;
+
+ using child_list = boost::intrusive::list<child,
+ boost::intrusive::constant_time_size<false>>;
+ child_list outstanding;
+ child_list free;
+
+ co_waiter<void, executor_type> waiter;
+
+ // return an awaitable that completes once count <= target_count
+ auto wait_for(size_t target_count)
+ -> boost::asio::awaitable<void, executor_type>
+ {
+ wait_for_count = target_count;
+ return waiter.get();
+ }
+
+ void on_complete(child& c, std::exception_ptr eptr)
+ {
+ --count;
+
+ if (c.canceled) {
+ // if the child was canceled, it was already removed from outstanding
+ ceph_assert(!c.is_linked());
+ c.canceled = false;
+ c.signal.reset();
+ free.push_back(c);
+ } else {
+ // move back to the free list
+ ceph_assert(c.is_linked());
+ auto next = outstanding.erase(outstanding.iterator_to(c));
+ c.signal.reset();
+ free.push_back(c);
+
+ if (eptr) {
+ if (eptr && !unreported_exception) {
+ unreported_exception = eptr;
+ }
+
+ // handle cancel_on_error. cancellation signals may recurse into
+ // on_complete(), so move the entries into a separate list first
+ child_list to_cancel;
+ if (on_error == cancel_on_error::after) {
+ to_cancel.splice(to_cancel.end(), outstanding,
+ next, outstanding.end());
+ } else if (on_error == cancel_on_error::all) {
+ to_cancel = std::move(outstanding);
+ }
+
+ for (auto i = to_cancel.begin(); i != to_cancel.end(); ++i) {
+ child& c = *i;
+ i = to_cancel.erase(i);
+
+ c.canceled = true;
+ c.signal->emit(boost::asio::cancellation_type::terminal);
+ }
+ }
+ }
+
+ // maybe wake the waiter
+ if (waiter.waiting() && count <= wait_for_count) {
+ waiter.complete(nullptr);
+ }
+ }
+
+ struct child_completion {
+ boost::intrusive_ptr<co_throttle_impl> impl;
+ child& c;
+
+ void operator()(std::exception_ptr eptr) {
+ impl->on_complete(c, eptr);
+ }
+ };
+};
+
+} // namespace ceph::async::detail
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/co_throttle.h"
+
+#include <latch>
+#include <optional>
+#include <boost/asio/any_io_executor.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/defer.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/thread_pool.hpp>
+#include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+namespace errc = boost::system::errc;
+using boost::system::error_code;
+
+using executor_type = asio::any_io_executor;
+
+using void_waiter = co_waiter<void, executor_type>;
+
+auto capture(std::optional<std::exception_ptr>& eptr)
+{
+ return [&eptr] (std::exception_ptr e) { eptr = e; };
+}
+
+auto capture(asio::cancellation_signal& signal,
+ std::optional<std::exception_ptr>& eptr)
+{
+ return asio::bind_cancellation_slot(signal.slot(), capture(eptr));
+}
+
+asio::awaitable<void> wait(void_waiter& waiter, bool& completed)
+{
+ co_await waiter.get();
+ completed = true;
+}
+
+TEST(co_throttle, wait_empty)
+{
+ constexpr size_t limit = 1;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(co_throttle, spawn_over_limit)
+{
+ constexpr size_t limit = 1;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ bool spawn1_completed = false;
+ bool spawn2_completed = false;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter1.get());
+ spawn1_completed = true;
+ co_await throttle.spawn(waiter2.get());
+ spawn2_completed = true;
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll(); // run until spawn2 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn1_completed);
+ EXPECT_FALSE(spawn2_completed);
+
+ waiter1.complete(nullptr);
+
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+ EXPECT_TRUE(spawn2_completed);
+
+ waiter2.complete(nullptr);
+
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(co_throttle, spawn_over_smaller_limit)
+{
+ constexpr size_t limit = 2;
+ constexpr size_t smaller_limit = 1;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ bool spawn1_completed = false;
+ bool spawn2_completed = false;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter1.get());
+ spawn1_completed = true;
+ co_await throttle.spawn(waiter2.get(), smaller_limit);
+ spawn2_completed = true;
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll(); // run until spawn2 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn1_completed);
+ EXPECT_FALSE(spawn2_completed);
+
+ waiter1.complete(nullptr);
+
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn2_completed);
+
+ waiter2.complete(nullptr);
+
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+}
+
+TEST(co_throttle, spawn_cancel)
+{
+ constexpr size_t limit = 1;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ bool spawn1_completed = false;
+ bool spawn2_completed = false;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter1.get());
+ spawn1_completed = true;
+ co_await throttle.spawn(waiter2.get());
+ spawn2_completed = true;
+ co_await throttle.wait();
+ };
+
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(signal, result));
+
+ ctx.poll(); // run until spawn2 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn1_completed);
+ EXPECT_FALSE(spawn2_completed);
+
+ // cancel before spawn2 completes
+ signal.emit(asio::cancellation_type::terminal);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+ EXPECT_FALSE(spawn2_completed);
+ ASSERT_TRUE(result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+}
+
+TEST(co_throttle, wait_cancel)
+{
+ constexpr size_t limit = 1;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter;
+ bool spawn_completed = false;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter.get());
+ spawn_completed = true;
+ co_await throttle.wait();
+ };
+
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(signal, result));
+
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn_completed);
+ EXPECT_FALSE(result);
+
+ // cancel before wait completes
+ signal.emit(asio::cancellation_type::terminal);
+
+ ctx.poll();
+ ASSERT_TRUE(ctx.stopped()); // poll runs to completion
+ ASSERT_TRUE(result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+}
+
+TEST(co_throttle, spawn_shutdown)
+{
+ constexpr size_t limit = 1;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ bool spawn1_completed = false;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter1.get());
+ spawn1_completed = true;
+ co_await throttle.spawn(waiter2.get());
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll(); // run until spawn2 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn1_completed);
+ EXPECT_FALSE(result);
+ // shut down before spawn2 completes
+}
+
+TEST(co_throttle, wait_shutdown)
+{
+ constexpr size_t limit = 1;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter;
+ bool spawn_completed = false;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter.get());
+ spawn_completed = true;
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_TRUE(spawn_completed);
+ EXPECT_FALSE(result);
+ // shut down before wait completes
+}
+
+TEST(co_throttle, spawn_error)
+{
+ constexpr size_t limit = 2;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ void_waiter waiter3;
+ bool cr1_completed = false;
+ bool cr2_completed = false;
+ bool cr3_completed = false;
+ std::exception_ptr spawn3_eptr;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(wait(waiter1, cr1_completed));
+ co_await throttle.spawn(wait(waiter2, cr2_completed));
+ try {
+ co_await throttle.spawn(wait(waiter3, cr3_completed));
+ } catch (const std::exception&) {
+ spawn3_eptr = std::current_exception();
+ }
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll(); // run until spawn3 blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(cr1_completed);
+ EXPECT_FALSE(cr2_completed);
+ EXPECT_FALSE(cr3_completed);
+
+ waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ ASSERT_TRUE(spawn3_eptr);
+ EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+ EXPECT_FALSE(result);
+
+ waiter1.complete(nullptr);
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped()); // wait still blocked
+
+ waiter3.complete(nullptr);
+
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+ EXPECT_TRUE(cr1_completed);
+ EXPECT_FALSE(cr2_completed);
+ EXPECT_TRUE(cr3_completed); // cr3 isn't canceled by cr2's error
+}
+
+TEST(co_throttle, wait_error)
+{
+ constexpr size_t limit = 1;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{co_await asio::this_coro::executor, limit};
+ co_await throttle.spawn(waiter.get());
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(result);
+
+ waiter.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(co_throttle, spawn_cancel_on_error_after)
+{
+ constexpr size_t limit = 2;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ void_waiter waiter3;
+ void_waiter waiter4;
+ bool cr1_completed = false;
+ bool cr2_completed = false;
+ bool cr3_completed = false;
+ bool cr4_completed = false;
+ std::exception_ptr spawn3_eptr;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto ex = co_await asio::this_coro::executor;
+ auto throttle = co_throttle{ex, limit, cancel_on_error::after};
+ co_await throttle.spawn(wait(waiter1, cr1_completed));
+ co_await throttle.spawn(wait(waiter2, cr2_completed));
+ try {
+ co_await throttle.spawn(wait(waiter3, cr3_completed));
+ } catch (const std::exception&) {
+ spawn3_eptr = std::current_exception();
+ }
+ co_await throttle.spawn(wait(waiter4, cr4_completed));
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll(); // run until spawn3 blocks
+ ASSERT_FALSE(ctx.stopped());
+
+ waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ EXPECT_FALSE(cr1_completed);
+ ASSERT_TRUE(spawn3_eptr);
+ EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+
+ waiter1.complete(nullptr);
+
+ ctx.poll();
+ ASSERT_FALSE(ctx.stopped()); // wait still blocked
+ EXPECT_FALSE(result);
+ EXPECT_TRUE(cr1_completed);
+ EXPECT_FALSE(cr4_completed);
+
+ waiter4.complete(nullptr);
+
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+ EXPECT_FALSE(cr2_completed); // exited by exception
+ EXPECT_FALSE(cr3_completed); // cr3 canceled
+ EXPECT_TRUE(cr4_completed); // cr4 not canceled
+}
+
+TEST(co_throttle, spawn_cancel_on_error_all)
+{
+ constexpr size_t limit = 2;
+ asio::io_context ctx;
+ executor_type ex = ctx.get_executor();
+
+ void_waiter waiter1;
+ void_waiter waiter2;
+ void_waiter waiter3;
+ void_waiter waiter4;
+ bool cr1_completed = false;
+ bool cr2_completed = false;
+ bool cr3_completed = false;
+ bool cr4_completed = false;
+ std::exception_ptr spawn3_eptr;
+
+ auto cr = [&] () -> asio::awaitable<void> {
+ auto ex = co_await asio::this_coro::executor;
+ auto throttle = co_throttle{ex, limit, cancel_on_error::all};
+ co_await throttle.spawn(wait(waiter1, cr1_completed));
+ co_await throttle.spawn(wait(waiter2, cr2_completed));
+ try {
+ co_await throttle.spawn(wait(waiter3, cr3_completed));
+ } catch (const std::exception&) {
+ spawn3_eptr = std::current_exception();
+ }
+ co_await throttle.spawn(wait(waiter4, cr4_completed));
+ co_await throttle.wait();
+ };
+
+ std::optional<std::exception_ptr> result;
+ asio::co_spawn(ex, cr(), capture(result));
+
+ ctx.poll(); // run until spawn3 blocks
+ ASSERT_FALSE(ctx.stopped());
+
+ waiter2.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+ ctx.poll(); // run until wait blocks
+ ASSERT_FALSE(ctx.stopped());
+ ASSERT_TRUE(spawn3_eptr);
+ EXPECT_THROW(std::rethrow_exception(spawn3_eptr), std::runtime_error);
+ EXPECT_FALSE(cr4_completed);
+
+ waiter4.complete(nullptr);
+
+ ctx.poll(); // run to completion
+ ASSERT_TRUE(ctx.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_FALSE(*result);
+ EXPECT_FALSE(cr1_completed); // cr1 canceled
+ EXPECT_FALSE(cr2_completed); // exited by exception
+ EXPECT_FALSE(cr3_completed); // cr3 canceled
+ EXPECT_TRUE(cr4_completed); // cr4 not canceled
+}
+
+TEST(co_throttle, cross_thread_cancel)
+{
+ constexpr size_t limit = 1;
+ // run the coroutine in a background thread
+ asio::thread_pool ctx{1};
+ executor_type ex = ctx.get_executor();
+
+ std::latch waiting{1};
+
+ auto cr = [ex, &waiting] () -> asio::awaitable<void> {
+ auto throttle = co_throttle{ex, limit};
+ co_waiter<void, executor_type> waiter;
+ co_await throttle.spawn(waiter.get());
+ // decrement the latch after throttle.wait() suspends
+ asio::defer(ex, [&waiting] { waiting.count_down(); });
+ co_await throttle.wait();
+ };
+
+ asio::cancellation_signal signal;
+ std::optional<std::exception_ptr> result;
+ // without bind_executor(), tsan identifies a data race on signal.emit()
+ asio::co_spawn(ex, cr(), bind_executor(ex, capture(signal, result)));
+
+ waiting.wait(); // wait until we've suspended in throttle.wait()
+
+ signal.emit(asio::cancellation_type::terminal);
+
+ ctx.join();
+ ASSERT_TRUE(result);
+ ASSERT_TRUE(*result);
+ try {
+ std::rethrow_exception(*result);
+ } catch (const boost::system::system_error& e) {
+ EXPECT_EQ(e.code(), asio::error::operation_aborted);
+ } catch (const std::exception&) {
+ EXPECT_THROW(throw, boost::system::system_error);
+ }
+}
+
+} // namespace ceph::async