From: Casey Bodley Date: Mon, 22 Jul 2024 20:48:29 +0000 (-0400) Subject: common/async: spawn_throttle wraps call to asio::spawn() X-Git-Tag: testing/wip-pdonnell-testing-20240726.202642-debug~11^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f03d0cec0fcb52c8c36e80d0976cb2eaedf547f1;p=ceph-ci.git common/async: spawn_throttle wraps call to asio::spawn() cancellation of the parent must immediately cancel its children, which only works if the children are on the same executor as the parent prohibit child coroutines from being spawned on a different executor by wrapping the call to asio::spawn() in a new spawn_throttle::spawn() interface expose an overload for asio::spawn()'s optional StackAllocator argument Signed-off-by: Casey Bodley --- diff --git a/src/common/async/max_concurrent_for_each.h b/src/common/async/max_concurrent_for_each.h index c0789a16064..04f61729ba7 100644 --- a/src/common/async/max_concurrent_for_each.h +++ b/src/common/async/max_concurrent_for_each.h @@ -60,10 +60,9 @@ void max_concurrent_for_each(Iterator begin, } auto throttle = spawn_throttle{y, max_concurrent, on_error}; for (Iterator i = begin; i != end; ++i) { - boost::asio::spawn(throttle.get_executor(), - [&func, &val = *i] (boost::asio::yield_context yield) { - func(val, yield); - }, throttle); + throttle.spawn([&func, &val = *i] (boost::asio::yield_context yield) { + func(val, yield); + }); } throttle.wait(); } diff --git a/src/common/async/spawn_throttle.h b/src/common/async/spawn_throttle.h index 02e07ca3deb..1fdff1928c7 100644 --- a/src/common/async/spawn_throttle.h +++ b/src/common/async/spawn_throttle.h @@ -28,11 +28,10 @@ namespace ceph::async { /// The parent may either be a synchronous function or a stackful coroutine, /// depending on the optional_yield constructor argument. /// -/// Child coroutines are spawned by calling boost::asio::spawn() and using the -/// spawn_throttle object as the CompletionToken argument. Exceptions thrown -/// by children are reported to the caller on its next call to get() or wait(). -/// The cancel_on_error option controls whether these exceptions trigger the -/// cancellation of other children. +/// Child coroutines take boost::asio::yield_context as the only argument. +/// Exceptions thrown by children are reported to the caller on its next call +/// to spawn() or wait(). The cancel_on_error option controls whether these +/// exceptions trigger the cancellation of other children. /// /// All child coroutines are canceled by cancel() or spawn_throttle destruction. /// This allows a parent function to share memory with its child coroutines @@ -51,7 +50,7 @@ namespace ceph::async { /// auto throttle = ceph::async::spawn_throttle{y, 10}; /// /// for (size_t i = 0; i < count; i++) { -/// boost::asio::spawn(throttle.get_executor(), child, throttle); +/// throttle.spawn(child); /// } /// throttle.wait(); /// } @@ -86,19 +85,24 @@ class spawn_throttle { return impl->get_executor(); } - /// Return a cancellable spawn() completion handler with signature - /// void(std::exception_ptr). + /// Spawn a cancellable coroutine to call the given function, passing its + /// boost::asio::yield_context as the only argument. /// - /// This function may block until a throttle unit becomes available. If one or - /// more previously-spawned coroutines exit with an exception, the first such - /// exception is rethrown here. - /// - /// As a convenience, you can avoid calling this function by using the - /// spawn_throttle itself as a CompletionToken for spawn(). - auto get() - -> detail::spawn_throttle_handler + /// Before spawning, this function may block until a throttle unit becomes + /// available. If one or more previously-spawned coroutines exit with an + /// exception, the first such exception is rethrown here. + template + void spawn(F&& f) + { + boost::asio::spawn(get_executor(), std::forward(f), impl->get()); + } + + /// /overload + template + void spawn(std::allocator_arg_t arg, StackAllocator&& alloc, F&& f) { - return impl->get(); + boost::asio::spawn(get_executor(), arg, std::forward(alloc), + std::forward(f), impl->get()); } /// Wait for all outstanding completions before returning. If any @@ -120,27 +124,3 @@ class spawn_throttle { }; } // namespace ceph::async - -namespace boost::asio { - -// Allow spawn_throttle to be used as a CompletionToken. -template -struct async_result -{ - using completion_handler_type = - ceph::async::detail::spawn_throttle_handler; - async_result(completion_handler_type&) {} - - using return_type = void; - return_type get() {} - - template - static return_type initiate(Initiation&& init, - ceph::async::spawn_throttle& throttle, - Args&& ...args) - { - return std::move(init)(throttle.get(), std::forward(args)...); - } -}; - -} // namespace boost::asio diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 31a74e183e6..ace916cd7ea 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -7156,10 +7156,9 @@ void RGWDeleteMultiObj::execute(optional_yield y) auto group = ceph::async::spawn_throttle{y, max_aio}; for (const auto& key : multi_delete->objects) { - boost::asio::spawn(group.get_executor(), - [this, &key] (boost::asio::yield_context yield) { - handle_individual_object(key, yield); - }, group); + group.spawn([this, &key] (boost::asio::yield_context yield) { + handle_individual_object(key, yield); + }); rgw_flush_formatter(s, s->formatter); } diff --git a/src/test/common/test_async_spawn_throttle.cc b/src/test/common/test_async_spawn_throttle.cc index 6f306a13918..7e19738a8e2 100644 --- a/src/test/common/test_async_spawn_throttle.cc +++ b/src/test/common/test_async_spawn_throttle.cc @@ -81,7 +81,7 @@ TEST(YieldGroupSync, spawn_wait) }; auto throttle = spawn_throttle{null_yield, 2}; - asio::spawn(throttle.get_executor(), cr, throttle); + throttle.spawn(cr); throttle.wait(); EXPECT_EQ(1, completed); @@ -90,7 +90,7 @@ TEST(YieldGroupSync, spawn_wait) TEST(YieldGroupSync, spawn_shutdown) { auto throttle = spawn_throttle{null_yield, 2}; - asio::spawn(throttle.get_executor(), wait_for(1s), throttle); + throttle.spawn(wait_for(1s)); } TEST(YieldGroupSync, spawn_cancel_wait) @@ -103,7 +103,7 @@ TEST(YieldGroupSync, spawn_cancel_wait) }; auto throttle = spawn_throttle{null_yield, 2}; - asio::spawn(throttle.get_executor(), cr, throttle); + throttle.spawn(cr); throttle.cancel(); EXPECT_THROW(throttle.wait(), boost::system::system_error); @@ -120,10 +120,10 @@ TEST(YieldGroupSync, spawn_cancel_wait_spawn_wait) }; auto throttle = spawn_throttle{null_yield, 2}; - asio::spawn(throttle.get_executor(), cr, throttle); + throttle.spawn(cr); throttle.cancel(); EXPECT_THROW(throttle.wait(), boost::system::system_error); - asio::spawn(throttle.get_executor(), cr, throttle); + throttle.spawn(cr); throttle.wait(); EXPECT_EQ(1, completed); @@ -148,10 +148,10 @@ TEST(YieldGroupSync, spawn_over_limit) }; auto throttle = spawn_throttle{null_yield, 2}; - asio::spawn(throttle.get_executor(), cr, throttle); - asio::spawn(throttle.get_executor(), cr, throttle); - asio::spawn(throttle.get_executor(), cr, throttle); // blocks - asio::spawn(throttle.get_executor(), cr, throttle); // blocks + throttle.spawn(cr); + throttle.spawn(cr); + throttle.spawn(cr); // blocks + throttle.spawn(cr); // blocks throttle.wait(); // blocks EXPECT_EQ(0, concurrent); @@ -173,10 +173,10 @@ TEST(YieldGroupSync, spawn_cancel_on_error_none) }; auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none}; - asio::spawn(throttle.get_executor(), cr, throttle); - asio::spawn(throttle.get_executor(), cr, throttle); - asio::spawn(throttle.get_executor(), err, throttle); - asio::spawn(throttle.get_executor(), cr, throttle); + throttle.spawn(cr); + throttle.spawn(cr); + throttle.spawn(err); + throttle.spawn(cr); EXPECT_THROW(throttle.wait(), std::logic_error); EXPECT_EQ(3, completed); @@ -196,10 +196,10 @@ TEST(YieldGroupSync, spawn_cancel_on_error_after) }; auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after}; - asio::spawn(throttle.get_executor(), cr, throttle); - asio::spawn(throttle.get_executor(), cr, throttle); - asio::spawn(throttle.get_executor(), err, throttle); - asio::spawn(throttle.get_executor(), cr, throttle); + throttle.spawn(cr); + throttle.spawn(cr); + throttle.spawn(err); + throttle.spawn(cr); EXPECT_THROW(throttle.wait(), std::logic_error); EXPECT_EQ(2, completed); @@ -219,10 +219,10 @@ TEST(YieldGroupSync, spawn_cancel_on_error_all) }; auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all}; - asio::spawn(throttle.get_executor(), cr, throttle); - asio::spawn(throttle.get_executor(), cr, throttle); - asio::spawn(throttle.get_executor(), err, throttle); - asio::spawn(throttle.get_executor(), cr, throttle); + throttle.spawn(cr); + throttle.spawn(cr); + throttle.spawn(err); + throttle.spawn(cr); EXPECT_THROW(throttle.wait(), std::logic_error); EXPECT_EQ(0, completed); @@ -247,7 +247,7 @@ TEST(YieldGroupAsync, spawn_wait) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 2}; - asio::spawn(yield, wait_on(waiter), throttle); + throttle.spawn(wait_on(waiter)); throttle.wait(); // blocks }, rethrow); @@ -273,10 +273,10 @@ TEST(YieldGroupAsync, spawn_over_limit) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 2}; - asio::spawn(yield, wait_on(waiter1), throttle); - asio::spawn(yield, wait_on(waiter2), throttle); - asio::spawn(yield, wait_on(waiter3), throttle); // blocks - asio::spawn(yield, wait_on(waiter4), throttle); // blocks + throttle.spawn(wait_on(waiter1)); + throttle.spawn(wait_on(waiter2)); + throttle.spawn(wait_on(waiter3)); // blocks + throttle.spawn(wait_on(waiter4)); // blocks throttle.wait(); // blocks }, rethrow); @@ -320,7 +320,7 @@ TEST(YieldGroupAsync, spawn_shutdown) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 2}; - asio::spawn(yield, wait_on(waiter1), throttle); + throttle.spawn(wait_on(waiter1)); waiter2.async_wait(yield); // blocks // shut down while there's an outstanding child but throttle is not // waiting on spawn() or wait() @@ -340,8 +340,8 @@ TEST(YieldGroupAsync, spawn_throttled_shutdown) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 1}; - asio::spawn(yield, wait_on(waiter1), throttle); - asio::spawn(yield, wait_on(waiter2), throttle); // blocks + throttle.spawn(wait_on(waiter1)); + throttle.spawn(wait_on(waiter2)); // blocks // shut down while we're throttled on the second spawn }, rethrow); @@ -358,7 +358,7 @@ TEST(YieldGroupAsync, spawn_wait_shutdown) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 1}; - asio::spawn(yield, wait_on(waiter), throttle); + throttle.spawn(wait_on(waiter)); throttle.wait(); // blocks // shut down while we're wait()ing }, rethrow); @@ -378,8 +378,8 @@ TEST(YieldGroupAsync, spawn_throttled_error) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 1}; - asio::spawn(yield, wait_on(waiter1), throttle); - asio::spawn(yield, wait_on(waiter2), throttle); // blocks + throttle.spawn(wait_on(waiter1)); + throttle.spawn(wait_on(waiter2)); // blocks }, capture(result)); ctx.poll(); @@ -413,8 +413,8 @@ TEST(YieldGroupAsync, spawn_throttled_signal) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 1}; - asio::spawn(yield, wait_on(waiter1), throttle); - asio::spawn(yield, wait_on(waiter2), throttle); // blocks + throttle.spawn(wait_on(waiter1)); + throttle.spawn(wait_on(waiter2)); // blocks }, capture(signal, result)); ctx.poll(); @@ -446,7 +446,7 @@ TEST(YieldGroupAsync, spawn_wait_error) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 1}; - asio::spawn(yield, wait_on(waiter), throttle); + throttle.spawn(wait_on(waiter)); throttle.wait(); // blocks }, capture(result)); @@ -479,7 +479,7 @@ TEST(YieldGroupAsync, spawn_wait_signal) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 1}; - asio::spawn(yield, wait_on(waiter), throttle); + throttle.spawn(wait_on(waiter)); throttle.wait(); // blocks }, capture(signal, result)); @@ -511,7 +511,7 @@ TEST(YieldGroupAsync, spawn_cancel_wait) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 2}; - asio::spawn(yield, wait_on(waiter), throttle); + throttle.spawn(wait_on(waiter)); throttle.cancel(); throttle.wait(); }, capture(result)); @@ -539,9 +539,9 @@ TEST(YieldGroupAsync, spawn_cancel_on_error_none) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 4, cancel_on_error::none}; - asio::spawn(yield, wait_on(waiter1), throttle); - asio::spawn(yield, wait_on(waiter2), throttle); - asio::spawn(yield, wait_on(waiter3), throttle); + throttle.spawn(wait_on(waiter1)); + throttle.spawn(wait_on(waiter2)); + throttle.spawn(wait_on(waiter3)); throttle.wait(); // blocks }, capture(result)); @@ -586,9 +586,9 @@ TEST(YieldGroupAsync, spawn_cancel_on_error_after) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 4, cancel_on_error::after}; - asio::spawn(yield, wait_on(waiter1), throttle); - asio::spawn(yield, wait_on(waiter2), throttle); - asio::spawn(yield, wait_on(waiter3), throttle); + throttle.spawn(wait_on(waiter1)); + throttle.spawn(wait_on(waiter2)); + throttle.spawn(wait_on(waiter3)); throttle.wait(); // blocks }, capture(result)); @@ -629,9 +629,9 @@ TEST(YieldGroupAsync, spawn_cancel_on_error_all) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 4, cancel_on_error::all}; - asio::spawn(yield, wait_on(waiter1), throttle); - asio::spawn(yield, wait_on(waiter2), throttle); - asio::spawn(yield, wait_on(waiter3), throttle); + throttle.spawn(wait_on(waiter1)); + throttle.spawn(wait_on(waiter2)); + throttle.spawn(wait_on(waiter3)); throttle.wait(); // blocks }, capture(result)); @@ -665,9 +665,9 @@ TEST(YieldGroupAsync, spawn_wait_spawn_wait) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 1}; - asio::spawn(yield, wait_on(waiter1), throttle); + throttle.spawn(wait_on(waiter1)); throttle.wait(); // blocks - asio::spawn(yield, wait_on(waiter2), throttle); + throttle.spawn(wait_on(waiter2)); throttle.wait(); // blocks }, rethrow); @@ -698,10 +698,10 @@ TEST(YieldGroupAsync, spawn_cancel_wait_spawn_wait) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 1}; - asio::spawn(yield, wait_on(waiter1), throttle); + throttle.spawn(wait_on(waiter1)); throttle.cancel(); EXPECT_THROW(throttle.wait(), boost::system::system_error); - asio::spawn(yield, wait_on(waiter2), throttle); + throttle.spawn(wait_on(waiter2)); throttle.wait(); // blocks }, rethrow); @@ -723,9 +723,9 @@ TEST(YieldGroupAsync, spawn_error_wait_spawn_wait) asio::spawn(ctx, [&] (asio::yield_context yield) { auto throttle = spawn_throttle{yield, 1}; - asio::spawn(yield, wait_on(waiter1), throttle); + throttle.spawn(wait_on(waiter1)); EXPECT_THROW(throttle.wait(), boost::system::system_error); - asio::spawn(yield, wait_on(waiter2), throttle); + throttle.spawn(wait_on(waiter2)); throttle.wait(); // blocks }, rethrow);