#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include "common/async/cancel_on_error.h"
#include "common/async/service.h"
-#include "common/async/yield_context.h"
namespace ceph::async::detail {
struct spawn_throttle_handler;
-// Reference-counted spawn throttle interface.
+// Reference-counted spawn throttle implementation.
class spawn_throttle_impl :
public boost::intrusive_ref_counter<spawn_throttle_impl,
- boost::thread_unsafe_counter>
+ boost::thread_unsafe_counter>,
+ public service_list_base_hook
{
public:
- spawn_throttle_impl(size_t limit, cancel_on_error on_error)
- : limit(limit), on_error(on_error),
+ spawn_throttle_impl(boost::asio::yield_context yield,
+ size_t limit, cancel_on_error on_error)
+ : svc(boost::asio::use_service<service<spawn_throttle_impl>>(
+ boost::asio::query(yield.get_executor(),
+ boost::asio::execution::context))),
+ yield(yield), limit(limit), on_error(on_error),
children(std::make_unique<child[]>(limit))
{
+ // register for service_shutdown() notifications
+ svc.add(*this);
+
// initialize the free list
for (size_t i = 0; i < limit; i++) {
free.push_back(children[i]);
}
}
- virtual ~spawn_throttle_impl() {}
+
+ ~spawn_throttle_impl()
+ {
+ svc.remove(*this);
+ }
// return the completion handler for a new child. may block due to throttling
// or rethrow an exception from a previously-spawned child
};
using executor_type = boost::asio::any_io_executor;
- virtual executor_type get_executor() = 0;
+ executor_type get_executor()
+ {
+ return yield.get_executor();
+ }
// wait until count <= target_count
- virtual void wait_for(size_t target_count) = 0;
+ void wait_for(size_t target_count)
+ {
+ if (count > target_count) {
+ wait_for_count = target_count;
+
+ boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
+ [this] (auto handler) {
+ auto slot = get_associated_cancellation_slot(handler);
+ if (slot.is_connected()) {
+ slot.template emplace<op_cancellation>(this);
+ }
+ waiter.emplace(std::move(handler));
+ }, yield);
+ // this is a coroutine, so the wait has completed by this point
+ }
+
+ report_exception(); // throw unreported exception
+ }
// cancel outstanding coroutines
- virtual void cancel(bool shutdown)
+ void cancel()
{
cancel_outstanding_from(outstanding.begin());
+ if (waiter) {
+ wait_complete(make_error_code(boost::asio::error::operation_aborted));
+ }
}
// complete the given child coroutine
- virtual void on_complete(child& c, std::exception_ptr eptr)
+ void on_complete(child& c, std::exception_ptr eptr)
{
--count;
}
cancel_outstanding_from(cancel_from);
}
+
+ if (waiter && count <= wait_for_count) {
+ wait_complete({});
+ }
+ }
+
+
+ void service_shutdown()
+ {
+ waiter.reset();
}
- protected:
+ private:
+ service<spawn_throttle_impl>& svc;
+ boost::asio::yield_context yield;
const size_t limit;
const cancel_on_error on_error;
size_t count = 0;
}
}
- private:
std::exception_ptr unreported_exception;
std::unique_ptr<child[]> children;
c.signal->emit(boost::asio::cancellation_type::terminal);
}
}
+
+ using WaitSignature = void(boost::system::error_code);
+ struct wait_state {
+ using Work = boost::asio::executor_work_guard<
+ boost::asio::any_io_executor>;
+ using Handler = typename boost::asio::async_result<
+ boost::asio::yield_context, WaitSignature>::handler_type;
+
+ Work work;
+ Handler handler;
+
+ explicit wait_state(Handler&& h)
+ : work(make_work_guard(h)),
+ handler(std::move(h))
+ {}
+ };
+ std::optional<wait_state> waiter;
+ size_t wait_for_count = 0;
+
+ struct op_cancellation {
+ spawn_throttle_impl* self;
+ explicit op_cancellation(spawn_throttle_impl* self) noexcept
+ : self(self) {}
+ void operator()(boost::asio::cancellation_type type) {
+ if (type != boost::asio::cancellation_type::none) {
+ self->cancel();
+ }
+ }
+ };
+
+ void wait_complete(boost::system::error_code ec)
+ {
+ auto w = std::move(*waiter);
+ waiter.reset();
+ boost::asio::dispatch(boost::asio::append(std::move(w.handler), ec));
+ }
};
// A cancellable spawn() completion handler that notifies the spawn_throttle
return {this, c};
}
-class async_spawn_throttle_impl final :
- public spawn_throttle_impl,
- public service_list_base_hook
-{
- public:
- async_spawn_throttle_impl(boost::asio::yield_context yield,
- size_t limit, cancel_on_error on_error)
- : spawn_throttle_impl(limit, on_error),
- svc(boost::asio::use_service<service<async_spawn_throttle_impl>>(
- boost::asio::query(yield.get_executor(),
- boost::asio::execution::context))),
- yield(yield)
- {
- // register for service_shutdown() notifications
- svc.add(*this);
- }
-
- ~async_spawn_throttle_impl()
- {
- svc.remove(*this);
- }
-
- executor_type get_executor() override
- {
- return yield.get_executor();
- }
-
- void service_shutdown()
- {
- waiter.reset();
- }
-
- private:
- service<async_spawn_throttle_impl>& svc;
- boost::asio::yield_context yield;
-
- using WaitSignature = void(boost::system::error_code);
- struct wait_state {
- using Work = boost::asio::executor_work_guard<
- boost::asio::any_io_executor>;
- using Handler = typename boost::asio::async_result<
- boost::asio::yield_context, WaitSignature>::handler_type;
-
- Work work;
- Handler handler;
-
- explicit wait_state(Handler&& h)
- : work(make_work_guard(h)),
- handler(std::move(h))
- {}
- };
- std::optional<wait_state> waiter;
- size_t wait_for_count = 0;
-
- struct op_cancellation {
- async_spawn_throttle_impl* self;
- explicit op_cancellation(async_spawn_throttle_impl* self) noexcept
- : self(self) {}
- void operator()(boost::asio::cancellation_type type) {
- if (type != boost::asio::cancellation_type::none) {
- self->cancel(false);
- }
- }
- };
-
- void wait_for(size_t target_count) override
- {
- if (count > target_count) {
- wait_for_count = target_count;
-
- boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
- [this] (auto handler) {
- auto slot = get_associated_cancellation_slot(handler);
- if (slot.is_connected()) {
- slot.template emplace<op_cancellation>(this);
- }
- waiter.emplace(std::move(handler));
- }, yield);
- // this is a coroutine, so the wait has completed by this point
- }
-
- report_exception(); // throw unreported exception
- }
-
- void wait_complete(boost::system::error_code ec)
- {
- auto w = std::move(*waiter);
- waiter.reset();
- boost::asio::dispatch(boost::asio::append(std::move(w.handler), ec));
- }
-
- void on_complete(child& c, std::exception_ptr eptr) override
- {
- spawn_throttle_impl::on_complete(c, eptr);
-
- if (waiter && count <= wait_for_count) {
- wait_complete({});
- }
- }
-
- void cancel(bool shutdown) override
- {
- spawn_throttle_impl::cancel(shutdown);
-
- if (waiter) {
- wait_complete(make_error_code(boost::asio::error::operation_aborted));
- }
- }
-};
-
} // namespace ceph::async::detail