/// The parent may either be a synchronous function or a stackful coroutine,
/// depending on the optional_yield constructor argument.
///
-/// Child coroutines are spawned by calling boost::asio::spawn() and using the
-/// spawn_throttle object as the CompletionToken argument. Exceptions thrown
-/// by children are reported to the caller on its next call to get() or wait().
-/// The cancel_on_error option controls whether these exceptions trigger the
-/// cancellation of other children.
+/// Child coroutines take boost::asio::yield_context as the only argument.
+/// Exceptions thrown by children are reported to the caller on its next call
+/// to spawn() or wait(). The cancel_on_error option controls whether these
+/// exceptions trigger the cancellation of other children.
///
/// All child coroutines are canceled by cancel() or spawn_throttle destruction.
/// This allows a parent function to share memory with its child coroutines
/// auto throttle = ceph::async::spawn_throttle{y, 10};
///
/// for (size_t i = 0; i < count; i++) {
-/// boost::asio::spawn(throttle.get_executor(), child, throttle);
+/// throttle.spawn(child);
/// }
/// throttle.wait();
/// }
return impl->get_executor();
}
- /// Return a cancellable spawn() completion handler with signature
- /// void(std::exception_ptr).
+ /// Spawn a cancellable coroutine to call the given function, passing its
+ /// boost::asio::yield_context as the only argument.
///
- /// This function may block until a throttle unit becomes available. If one or
- /// more previously-spawned coroutines exit with an exception, the first such
- /// exception is rethrown here.
- ///
- /// As a convenience, you can avoid calling this function by using the
- /// spawn_throttle itself as a CompletionToken for spawn().
- auto get()
- -> detail::spawn_throttle_handler
+ /// Before spawning, this function may block until a throttle unit becomes
+ /// available. If one or more previously-spawned coroutines exit with an
+ /// exception, the first such exception is rethrown here.
+ template <typename F>
+ void spawn(F&& f)
+ {
+ boost::asio::spawn(get_executor(), std::forward<F>(f), impl->get());
+ }
+
+ /// /overload
+ template <typename StackAllocator, typename F>
+ void spawn(std::allocator_arg_t arg, StackAllocator&& alloc, F&& f)
{
- return impl->get();
+ boost::asio::spawn(get_executor(), arg, std::forward<StackAllocator>(alloc),
+ std::forward<F>(f), impl->get());
}
/// Wait for all outstanding completions before returning. If any
};
} // namespace ceph::async
-
-namespace boost::asio {
-
-// Allow spawn_throttle to be used as a CompletionToken.
-template <typename Signature>
-struct async_result<ceph::async::spawn_throttle, Signature>
-{
- using completion_handler_type =
- ceph::async::detail::spawn_throttle_handler;
- async_result(completion_handler_type&) {}
-
- using return_type = void;
- return_type get() {}
-
- template <typename Initiation, typename... Args>
- static return_type initiate(Initiation&& init,
- ceph::async::spawn_throttle& throttle,
- Args&& ...args)
- {
- return std::move(init)(throttle.get(), std::forward<Args>(args)...);
- }
-};
-
-} // namespace boost::asio
};
auto throttle = spawn_throttle{null_yield, 2};
- asio::spawn(throttle.get_executor(), cr, throttle);
+ throttle.spawn(cr);
throttle.wait();
EXPECT_EQ(1, completed);
TEST(YieldGroupSync, spawn_shutdown)
{
auto throttle = spawn_throttle{null_yield, 2};
- asio::spawn(throttle.get_executor(), wait_for(1s), throttle);
+ throttle.spawn(wait_for(1s));
}
TEST(YieldGroupSync, spawn_cancel_wait)
};
auto throttle = spawn_throttle{null_yield, 2};
- asio::spawn(throttle.get_executor(), cr, throttle);
+ throttle.spawn(cr);
throttle.cancel();
EXPECT_THROW(throttle.wait(), boost::system::system_error);
};
auto throttle = spawn_throttle{null_yield, 2};
- asio::spawn(throttle.get_executor(), cr, throttle);
+ throttle.spawn(cr);
throttle.cancel();
EXPECT_THROW(throttle.wait(), boost::system::system_error);
- asio::spawn(throttle.get_executor(), cr, throttle);
+ throttle.spawn(cr);
throttle.wait();
EXPECT_EQ(1, completed);
};
auto throttle = spawn_throttle{null_yield, 2};
- asio::spawn(throttle.get_executor(), cr, throttle);
- asio::spawn(throttle.get_executor(), cr, throttle);
- asio::spawn(throttle.get_executor(), cr, throttle); // blocks
- asio::spawn(throttle.get_executor(), cr, throttle); // blocks
+ throttle.spawn(cr);
+ throttle.spawn(cr);
+ throttle.spawn(cr); // blocks
+ throttle.spawn(cr); // blocks
throttle.wait(); // blocks
EXPECT_EQ(0, concurrent);
};
auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none};
- asio::spawn(throttle.get_executor(), cr, throttle);
- asio::spawn(throttle.get_executor(), cr, throttle);
- asio::spawn(throttle.get_executor(), err, throttle);
- asio::spawn(throttle.get_executor(), cr, throttle);
+ throttle.spawn(cr);
+ throttle.spawn(cr);
+ throttle.spawn(err);
+ throttle.spawn(cr);
EXPECT_THROW(throttle.wait(), std::logic_error);
EXPECT_EQ(3, completed);
};
auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after};
- asio::spawn(throttle.get_executor(), cr, throttle);
- asio::spawn(throttle.get_executor(), cr, throttle);
- asio::spawn(throttle.get_executor(), err, throttle);
- asio::spawn(throttle.get_executor(), cr, throttle);
+ throttle.spawn(cr);
+ throttle.spawn(cr);
+ throttle.spawn(err);
+ throttle.spawn(cr);
EXPECT_THROW(throttle.wait(), std::logic_error);
EXPECT_EQ(2, completed);
};
auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all};
- asio::spawn(throttle.get_executor(), cr, throttle);
- asio::spawn(throttle.get_executor(), cr, throttle);
- asio::spawn(throttle.get_executor(), err, throttle);
- asio::spawn(throttle.get_executor(), cr, throttle);
+ throttle.spawn(cr);
+ throttle.spawn(cr);
+ throttle.spawn(err);
+ throttle.spawn(cr);
EXPECT_THROW(throttle.wait(), std::logic_error);
EXPECT_EQ(0, completed);
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 2};
- asio::spawn(yield, wait_on(waiter), throttle);
+ throttle.spawn(wait_on(waiter));
throttle.wait(); // blocks
}, rethrow);
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 2};
- asio::spawn(yield, wait_on(waiter1), throttle);
- asio::spawn(yield, wait_on(waiter2), throttle);
- asio::spawn(yield, wait_on(waiter3), throttle); // blocks
- asio::spawn(yield, wait_on(waiter4), throttle); // blocks
+ throttle.spawn(wait_on(waiter1));
+ throttle.spawn(wait_on(waiter2));
+ throttle.spawn(wait_on(waiter3)); // blocks
+ throttle.spawn(wait_on(waiter4)); // blocks
throttle.wait(); // blocks
}, rethrow);
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 2};
- asio::spawn(yield, wait_on(waiter1), throttle);
+ throttle.spawn(wait_on(waiter1));
waiter2.async_wait(yield); // blocks
// shut down while there's an outstanding child but throttle is not
// waiting on spawn() or wait()
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 1};
- asio::spawn(yield, wait_on(waiter1), throttle);
- asio::spawn(yield, wait_on(waiter2), throttle); // blocks
+ throttle.spawn(wait_on(waiter1));
+ throttle.spawn(wait_on(waiter2)); // blocks
// shut down while we're throttled on the second spawn
}, rethrow);
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 1};
- asio::spawn(yield, wait_on(waiter), throttle);
+ throttle.spawn(wait_on(waiter));
throttle.wait(); // blocks
// shut down while we're wait()ing
}, rethrow);
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 1};
- asio::spawn(yield, wait_on(waiter1), throttle);
- asio::spawn(yield, wait_on(waiter2), throttle); // blocks
+ throttle.spawn(wait_on(waiter1));
+ throttle.spawn(wait_on(waiter2)); // blocks
}, capture(result));
ctx.poll();
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 1};
- asio::spawn(yield, wait_on(waiter1), throttle);
- asio::spawn(yield, wait_on(waiter2), throttle); // blocks
+ throttle.spawn(wait_on(waiter1));
+ throttle.spawn(wait_on(waiter2)); // blocks
}, capture(signal, result));
ctx.poll();
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 1};
- asio::spawn(yield, wait_on(waiter), throttle);
+ throttle.spawn(wait_on(waiter));
throttle.wait(); // blocks
}, capture(result));
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 1};
- asio::spawn(yield, wait_on(waiter), throttle);
+ throttle.spawn(wait_on(waiter));
throttle.wait(); // blocks
}, capture(signal, result));
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 2};
- asio::spawn(yield, wait_on(waiter), throttle);
+ throttle.spawn(wait_on(waiter));
throttle.cancel();
throttle.wait();
}, capture(result));
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 4, cancel_on_error::none};
- asio::spawn(yield, wait_on(waiter1), throttle);
- asio::spawn(yield, wait_on(waiter2), throttle);
- asio::spawn(yield, wait_on(waiter3), throttle);
+ throttle.spawn(wait_on(waiter1));
+ throttle.spawn(wait_on(waiter2));
+ throttle.spawn(wait_on(waiter3));
throttle.wait(); // blocks
}, capture(result));
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 4, cancel_on_error::after};
- asio::spawn(yield, wait_on(waiter1), throttle);
- asio::spawn(yield, wait_on(waiter2), throttle);
- asio::spawn(yield, wait_on(waiter3), throttle);
+ throttle.spawn(wait_on(waiter1));
+ throttle.spawn(wait_on(waiter2));
+ throttle.spawn(wait_on(waiter3));
throttle.wait(); // blocks
}, capture(result));
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 4, cancel_on_error::all};
- asio::spawn(yield, wait_on(waiter1), throttle);
- asio::spawn(yield, wait_on(waiter2), throttle);
- asio::spawn(yield, wait_on(waiter3), throttle);
+ throttle.spawn(wait_on(waiter1));
+ throttle.spawn(wait_on(waiter2));
+ throttle.spawn(wait_on(waiter3));
throttle.wait(); // blocks
}, capture(result));
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 1};
- asio::spawn(yield, wait_on(waiter1), throttle);
+ throttle.spawn(wait_on(waiter1));
throttle.wait(); // blocks
- asio::spawn(yield, wait_on(waiter2), throttle);
+ throttle.spawn(wait_on(waiter2));
throttle.wait(); // blocks
}, rethrow);
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 1};
- asio::spawn(yield, wait_on(waiter1), throttle);
+ throttle.spawn(wait_on(waiter1));
throttle.cancel();
EXPECT_THROW(throttle.wait(), boost::system::system_error);
- asio::spawn(yield, wait_on(waiter2), throttle);
+ throttle.spawn(wait_on(waiter2));
throttle.wait(); // blocks
}, rethrow);
asio::spawn(ctx, [&] (asio::yield_context yield) {
auto throttle = spawn_throttle{yield, 1};
- asio::spawn(yield, wait_on(waiter1), throttle);
+ throttle.spawn(wait_on(waiter1));
EXPECT_THROW(throttle.wait(), boost::system::system_error);
- asio::spawn(yield, wait_on(waiter2), throttle);
+ throttle.spawn(wait_on(waiter2));
throttle.wait(); // blocks
}, rethrow);