From c2e0c157d03adb16baa83dc968ec065de10f299a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 22 Jul 2025 15:01:12 -0400 Subject: [PATCH] common/async: fold async_spawn_throttle_impl into spawn_throttle_impl spawn_throttle_impl was a virtual base class for the sync_ and async_ variants. with the sync variant removed, combine the classes into one and remove the use of virtual functions Signed-off-by: Casey Bodley --- src/common/async/detail/spawn_throttle_impl.h | 215 ++++++++---------- src/common/async/spawn_throttle.h | 7 +- 2 files changed, 96 insertions(+), 126 deletions(-) diff --git a/src/common/async/detail/spawn_throttle_impl.h b/src/common/async/detail/spawn_throttle_impl.h index c47aa2a518c66..4284d81c7246a 100644 --- a/src/common/async/detail/spawn_throttle_impl.h +++ b/src/common/async/detail/spawn_throttle_impl.h @@ -29,28 +29,39 @@ #include #include "common/async/cancel_on_error.h" #include "common/async/service.h" -#include "common/async/yield_context.h" namespace ceph::async::detail { struct spawn_throttle_handler; -// Reference-counted spawn throttle interface. +// Reference-counted spawn throttle implementation. class spawn_throttle_impl : public boost::intrusive_ref_counter + boost::thread_unsafe_counter>, + public service_list_base_hook { public: - spawn_throttle_impl(size_t limit, cancel_on_error on_error) - : limit(limit), on_error(on_error), + spawn_throttle_impl(boost::asio::yield_context yield, + size_t limit, cancel_on_error on_error) + : svc(boost::asio::use_service>( + boost::asio::query(yield.get_executor(), + boost::asio::execution::context))), + yield(yield), limit(limit), on_error(on_error), children(std::make_unique(limit)) { + // register for service_shutdown() notifications + svc.add(*this); + // initialize the free list for (size_t i = 0; i < limit; i++) { free.push_back(children[i]); } } - virtual ~spawn_throttle_impl() {} + + ~spawn_throttle_impl() + { + svc.remove(*this); + } // return the completion handler for a new child. may block due to throttling // or rethrow an exception from a previously-spawned child @@ -63,19 +74,42 @@ class spawn_throttle_impl : }; using executor_type = boost::asio::any_io_executor; - virtual executor_type get_executor() = 0; + executor_type get_executor() + { + return yield.get_executor(); + } // wait until count <= target_count - virtual void wait_for(size_t target_count) = 0; + void wait_for(size_t target_count) + { + if (count > target_count) { + wait_for_count = target_count; + + boost::asio::async_initiate( + [this] (auto handler) { + auto slot = get_associated_cancellation_slot(handler); + if (slot.is_connected()) { + slot.template emplace(this); + } + waiter.emplace(std::move(handler)); + }, yield); + // this is a coroutine, so the wait has completed by this point + } + + report_exception(); // throw unreported exception + } // cancel outstanding coroutines - virtual void cancel(bool shutdown) + void cancel() { cancel_outstanding_from(outstanding.begin()); + if (waiter) { + wait_complete(make_error_code(boost::asio::error::operation_aborted)); + } } // complete the given child coroutine - virtual void on_complete(child& c, std::exception_ptr eptr) + void on_complete(child& c, std::exception_ptr eptr) { --count; @@ -98,9 +132,21 @@ class spawn_throttle_impl : } cancel_outstanding_from(cancel_from); } + + if (waiter && count <= wait_for_count) { + wait_complete({}); + } + } + + + void service_shutdown() + { + waiter.reset(); } - protected: + private: + service& svc; + boost::asio::yield_context yield; const size_t limit; const cancel_on_error on_error; size_t count = 0; @@ -112,7 +158,6 @@ class spawn_throttle_impl : } } - private: std::exception_ptr unreported_exception; std::unique_ptr children; @@ -130,6 +175,42 @@ class spawn_throttle_impl : c.signal->emit(boost::asio::cancellation_type::terminal); } } + + using WaitSignature = void(boost::system::error_code); + struct wait_state { + using Work = boost::asio::executor_work_guard< + boost::asio::any_io_executor>; + using Handler = typename boost::asio::async_result< + boost::asio::yield_context, WaitSignature>::handler_type; + + Work work; + Handler handler; + + explicit wait_state(Handler&& h) + : work(make_work_guard(h)), + handler(std::move(h)) + {} + }; + std::optional waiter; + size_t wait_for_count = 0; + + struct op_cancellation { + spawn_throttle_impl* self; + explicit op_cancellation(spawn_throttle_impl* self) noexcept + : self(self) {} + void operator()(boost::asio::cancellation_type type) { + if (type != boost::asio::cancellation_type::none) { + self->cancel(); + } + } + }; + + void wait_complete(boost::system::error_code ec) + { + auto w = std::move(*waiter); + waiter.reset(); + boost::asio::dispatch(boost::asio::append(std::move(w.handler), ec)); + } }; // A cancellable spawn() completion handler that notifies the spawn_throttle @@ -184,114 +265,4 @@ inline spawn_throttle_handler spawn_throttle_impl::get() return {this, c}; } -class async_spawn_throttle_impl final : - public spawn_throttle_impl, - public service_list_base_hook -{ - public: - async_spawn_throttle_impl(boost::asio::yield_context yield, - size_t limit, cancel_on_error on_error) - : spawn_throttle_impl(limit, on_error), - svc(boost::asio::use_service>( - boost::asio::query(yield.get_executor(), - boost::asio::execution::context))), - yield(yield) - { - // register for service_shutdown() notifications - svc.add(*this); - } - - ~async_spawn_throttle_impl() - { - svc.remove(*this); - } - - executor_type get_executor() override - { - return yield.get_executor(); - } - - void service_shutdown() - { - waiter.reset(); - } - - private: - service& svc; - boost::asio::yield_context yield; - - using WaitSignature = void(boost::system::error_code); - struct wait_state { - using Work = boost::asio::executor_work_guard< - boost::asio::any_io_executor>; - using Handler = typename boost::asio::async_result< - boost::asio::yield_context, WaitSignature>::handler_type; - - Work work; - Handler handler; - - explicit wait_state(Handler&& h) - : work(make_work_guard(h)), - handler(std::move(h)) - {} - }; - std::optional waiter; - size_t wait_for_count = 0; - - struct op_cancellation { - async_spawn_throttle_impl* self; - explicit op_cancellation(async_spawn_throttle_impl* self) noexcept - : self(self) {} - void operator()(boost::asio::cancellation_type type) { - if (type != boost::asio::cancellation_type::none) { - self->cancel(false); - } - } - }; - - void wait_for(size_t target_count) override - { - if (count > target_count) { - wait_for_count = target_count; - - boost::asio::async_initiate( - [this] (auto handler) { - auto slot = get_associated_cancellation_slot(handler); - if (slot.is_connected()) { - slot.template emplace(this); - } - waiter.emplace(std::move(handler)); - }, yield); - // this is a coroutine, so the wait has completed by this point - } - - report_exception(); // throw unreported exception - } - - void wait_complete(boost::system::error_code ec) - { - auto w = std::move(*waiter); - waiter.reset(); - boost::asio::dispatch(boost::asio::append(std::move(w.handler), ec)); - } - - void on_complete(child& c, std::exception_ptr eptr) override - { - spawn_throttle_impl::on_complete(c, eptr); - - if (waiter && count <= wait_for_count) { - wait_complete({}); - } - } - - void cancel(bool shutdown) override - { - spawn_throttle_impl::cancel(shutdown); - - if (waiter) { - wait_complete(make_error_code(boost::asio::error::operation_aborted)); - } - } -}; - } // namespace ceph::async::detail diff --git a/src/common/async/spawn_throttle.h b/src/common/async/spawn_throttle.h index 41aae346b1bb6..51826ea0e9f06 100644 --- a/src/common/async/spawn_throttle.h +++ b/src/common/async/spawn_throttle.h @@ -19,7 +19,6 @@ #include #include "cancel_on_error.h" -#include "yield_context.h" namespace ceph::async { @@ -60,7 +59,7 @@ class spawn_throttle { public: spawn_throttle(boost::asio::yield_context yield, size_t limit, cancel_on_error on_error = cancel_on_error::none) - : impl(new detail::async_spawn_throttle_impl(yield, limit, on_error)) + : impl(new detail::spawn_throttle_impl(yield, limit, on_error)) {} spawn_throttle(spawn_throttle&&) = default; @@ -73,7 +72,7 @@ class spawn_throttle { ~spawn_throttle() { if (impl) { - impl->cancel(true); + impl->cancel(); } } @@ -117,7 +116,7 @@ class spawn_throttle { /// Cancel all outstanding coroutines. void cancel() { - impl->cancel(false); + impl->cancel(); } }; -- 2.39.5