#include <boost/asio/associated_cancellation_slot.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/asio/execution/context.hpp>
-#include <boost/asio/io_context.hpp>
#include <boost/asio/query.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/intrusive_ptr.hpp>
}
virtual ~spawn_throttle_impl() {}
- // factory function
- static auto create(optional_yield y, size_t limit, cancel_on_error on_error)
- -> boost::intrusive_ptr<spawn_throttle_impl>;
-
// return the completion handler for a new child. may block due to throttling
// or rethrow an exception from a previously-spawned child
spawn_throttle_handler get();
return {this, c};
}
-
-// Spawn throttle implementation for use in synchronous contexts where wait()
-// blocks the calling thread until completion.
-class sync_spawn_throttle_impl final : public spawn_throttle_impl {
- static constexpr int concurrency = 1; // only run from a single thread
- public:
- sync_spawn_throttle_impl(size_t limit, cancel_on_error on_error)
- : spawn_throttle_impl(limit, on_error),
- ctx(std::in_place, concurrency)
- {}
-
- executor_type get_executor() override
- {
- return ctx->get_executor();
- }
-
- void wait_for(size_t target_count) override
- {
- while (count > target_count) {
- if (ctx->stopped()) {
- ctx->restart();
- }
- ctx->run_one();
- }
-
- report_exception(); // throw unreported exception
- }
-
- void cancel(bool shutdown) override
- {
- spawn_throttle_impl::cancel(shutdown);
-
- if (shutdown) {
- // destroy the io_context to trigger two-phase shutdown which
- // destroys any completion handlers with a reference to 'this'
- ctx.reset();
- count = 0;
- }
- }
-
- private:
- std::optional<boost::asio::io_context> ctx;
-};
-
-// Spawn throttle implementation for use in asynchronous contexts where wait()
-// suspends the calling stackful coroutine.
class async_spawn_throttle_impl final :
public spawn_throttle_impl,
public service_list_base_hook
}
};
-inline auto spawn_throttle_impl::create(optional_yield y, size_t limit,
- cancel_on_error on_error)
- -> boost::intrusive_ptr<spawn_throttle_impl>
-{
- if (y) {
- auto yield = y.get_yield_context();
- return new async_spawn_throttle_impl(yield, limit, on_error);
- } else {
- return new sync_spawn_throttle_impl(limit, on_error);
- }
-}
-
} // namespace ceph::async::detail
namespace ceph::async {
-/// A coroutine throttle that allows a thread of execution to spawn and manage
+/// A coroutine throttle that allows a parent coroutine to spawn and manage
/// multiple child coroutines, while enforcing an upper bound on concurrency.
-/// The parent may either be a synchronous function or a stackful coroutine,
-/// depending on the optional_yield constructor argument.
///
/// Child coroutines take boost::asio::yield_context as the only argument.
/// Exceptions thrown by children are reported to the caller on its next call
/// @code
/// void child(boost::asio::yield_context yield);
///
-/// void parent(size_t count, optional_yield y)
+/// void parent(size_t count, boost::asio::yield_context yield)
/// {
/// // spawn all children, up to 10 at a time
-/// auto throttle = ceph::async::spawn_throttle{y, 10};
+/// auto throttle = ceph::async::spawn_throttle{yield, 10};
///
/// for (size_t i = 0; i < count; i++) {
/// throttle.spawn(child);
boost::intrusive_ptr<impl_type> impl;
public:
- spawn_throttle(optional_yield y, size_t limit,
+ spawn_throttle(boost::asio::yield_context yield, size_t limit,
cancel_on_error on_error = cancel_on_error::none)
- : impl(detail::spawn_throttle_impl::create(y, limit, on_error))
+ : impl(new detail::async_spawn_throttle_impl(yield, limit, on_error))
{}
spawn_throttle(spawn_throttle&&) = default;
}
-TEST(YieldGroupSync, wait_empty)
-{
- auto throttle = spawn_throttle{null_yield, 2};
- throttle.wait();
-}
-
-TEST(YieldGroupSync, spawn_wait)
-{
- int completed = 0;
- auto cr = [&] (asio::yield_context yield) {
- wait_for(1ms, yield);
- ++completed;
- };
-
- auto throttle = spawn_throttle{null_yield, 2};
- throttle.spawn(cr);
- throttle.wait();
-
- EXPECT_EQ(1, completed);
-}
-
-TEST(YieldGroupSync, spawn_shutdown)
-{
- auto throttle = spawn_throttle{null_yield, 2};
- throttle.spawn(wait_for(1s));
-}
-
-TEST(YieldGroupSync, spawn_cancel_wait)
-{
- int completed = 0;
-
- auto cr = [&] (asio::yield_context yield) {
- wait_for(1s, yield);
- ++completed;
- };
-
- auto throttle = spawn_throttle{null_yield, 2};
- throttle.spawn(cr);
- throttle.cancel();
- EXPECT_THROW(throttle.wait(), boost::system::system_error);
-
- EXPECT_EQ(0, completed);
-}
-
-TEST(YieldGroupSync, spawn_cancel_wait_spawn_wait)
-{
- int completed = 0;
-
- auto cr = [&] (asio::yield_context yield) {
- wait_for(1ms, yield);
- ++completed;
- };
-
- auto throttle = spawn_throttle{null_yield, 2};
- throttle.spawn(cr);
- throttle.cancel();
- EXPECT_THROW(throttle.wait(), boost::system::system_error);
- throttle.spawn(cr);
- throttle.wait();
-
- EXPECT_EQ(1, completed);
-}
-
-TEST(YieldGroupSync, spawn_over_limit)
-{
- int concurrent = 0;
- int max_concurrent = 0;
- int completed = 0;
-
- auto cr = [&] (asio::yield_context yield) {
- ++concurrent;
- if (max_concurrent < concurrent) {
- max_concurrent = concurrent;
- }
-
- wait_for(1ms, yield);
-
- --concurrent;
- ++completed;
- };
-
- auto throttle = spawn_throttle{null_yield, 2};
- throttle.spawn(cr);
- throttle.spawn(cr);
- throttle.spawn(cr); // blocks
- throttle.spawn(cr); // blocks
- throttle.wait(); // blocks
-
- EXPECT_EQ(0, concurrent);
- EXPECT_EQ(2, max_concurrent);
- EXPECT_EQ(4, completed);
-}
-
-TEST(YieldGroupSync, spawn_cancel_on_error_none)
-{
- int completed = 0;
-
- auto cr = [&] (asio::yield_context yield) {
- wait_for(10ms, yield);
- ++completed;
- };
- auto err = [] (asio::yield_context yield) {
- wait_for(0ms, yield);
- throw std::logic_error{"err"};
- };
-
- auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none};
- throttle.spawn(cr);
- throttle.spawn(cr);
- throttle.spawn(err);
- throttle.spawn(cr);
- EXPECT_THROW(throttle.wait(), std::logic_error);
-
- EXPECT_EQ(3, completed);
-}
-
-TEST(YieldGroupSync, spawn_cancel_on_error_after)
-{
- int completed = 0;
-
- auto cr = [&] (asio::yield_context yield) {
- wait_for(10ms, yield);
- ++completed;
- };
- auto err = [] (asio::yield_context yield) {
- wait_for(0ms, yield);
- throw std::logic_error{"err"};
- };
-
- auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after};
- throttle.spawn(cr);
- throttle.spawn(cr);
- throttle.spawn(err);
- throttle.spawn(cr);
- EXPECT_THROW(throttle.wait(), std::logic_error);
-
- EXPECT_EQ(2, completed);
-}
-
-TEST(YieldGroupSync, spawn_cancel_on_error_all)
-{
- int completed = 0;
-
- auto cr = [&] (asio::yield_context yield) {
- wait_for(1s, yield);
- ++completed;
- };
- auto err = [] (asio::yield_context yield) {
- wait_for(0ms, yield);
- throw std::logic_error{"err"};
- };
-
- auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all};
- throttle.spawn(cr);
- throttle.spawn(cr);
- throttle.spawn(err);
- throttle.spawn(cr);
- EXPECT_THROW(throttle.wait(), std::logic_error);
-
- EXPECT_EQ(0, completed);
-}
-
-
TEST(YieldGroupAsync, wait_empty)
{
asio::io_context ctx;