From eface5421a211b2c38ab5d201e30c6ba138600a6 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sun, 29 Jan 2023 20:31:21 -0500 Subject: [PATCH] common/async: co_throttle_impl uses co_waiter Signed-off-by: Casey Bodley --- src/common/async/detail/co_throttle_impl.h | 39 ++++++---------------- 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/src/common/async/detail/co_throttle_impl.h b/src/common/async/detail/co_throttle_impl.h index 43e4fcdbcaec6..d1a89db94c064 100644 --- a/src/common/async/detail/co_throttle_impl.h +++ b/src/common/async/detail/co_throttle_impl.h @@ -23,6 +23,7 @@ #include #include #include "common/async/cancel_on_error.h" +#include "common/async/co_waiter.h" #include "common/async/service.h" #include "include/ceph_assert.h" @@ -127,14 +128,16 @@ class co_throttle_impl : c.canceled = true; c.signal->emit(boost::asio::cancellation_type::terminal); } - if (wait_handler) { - wait_complete(make_error_code(boost::asio::error::operation_aborted)); + if (waiter.waiting()) { + auto eptr = std::exchange(unreported_exception, nullptr); + auto ec = make_error_code(boost::asio::error::operation_aborted); + waiter.complete(eptr, ec); } } void service_shutdown() { - wait_handler.reset(); + waiter.shutdown(); } private: @@ -162,25 +165,14 @@ class co_throttle_impl : child_list outstanding; child_list free; - using use_awaitable_t = boost::asio::use_awaitable_t; - - using wait_signature = void(std::exception_ptr, boost::system::error_code); - using wait_handler_type = typename boost::asio::async_result< - use_awaitable_t, wait_signature>::handler_type; - std::optional wait_handler; + co_waiter waiter; // return an awaitable that completes once count <= target_count auto wait_for(size_type target_count) -> awaitable { - ceph_assert(!wait_handler); // one waiter at a time wait_for_count = target_count; - - use_awaitable_t token; - return boost::asio::async_initiate( - [this] (wait_handler_type h) { - wait_handler.emplace(std::move(h)); - }, token); + return waiter.get(); } void on_complete(child& c, std::exception_ptr eptr, @@ -222,21 +214,12 @@ class co_throttle_impl : } // maybe wake the waiter - if (wait_handler && count <= wait_for_count) { - wait_complete({}); + if (waiter.waiting() && count <= wait_for_count) { + auto eptr = std::exchange(unreported_exception, nullptr); + waiter.complete(eptr, {}); } } - void wait_complete(boost::system::error_code ec) - { - // bind arguments to the handler for dispatch - auto eptr = std::exchange(unreported_exception, nullptr); - auto c = boost::asio::append(std::move(*wait_handler), eptr, ec); - wait_handler.reset(); - - boost::asio::dispatch(std::move(c)); - } - struct child_completion { boost::intrusive_ptr impl; child& c; -- 2.39.5