#include <boost/intrusive/list.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include "common/async/cancel_on_error.h"
+#include "common/async/co_waiter.h"
#include "common/async/service.h"
#include "include/ceph_assert.h"
c.canceled = true;
c.signal->emit(boost::asio::cancellation_type::terminal);
}
- if (wait_handler) {
- wait_complete(make_error_code(boost::asio::error::operation_aborted));
+ if (waiter.waiting()) {
+ auto eptr = std::exchange(unreported_exception, nullptr);
+ auto ec = make_error_code(boost::asio::error::operation_aborted);
+ waiter.complete(eptr, ec);
}
}
void service_shutdown()
{
- wait_handler.reset();
+ waiter.shutdown();
}
private:
child_list outstanding;
child_list free;
- using use_awaitable_t = boost::asio::use_awaitable_t<executor_type>;
-
- using wait_signature = void(std::exception_ptr, boost::system::error_code);
- using wait_handler_type = typename boost::asio::async_result<
- use_awaitable_t, wait_signature>::handler_type;
- std::optional<wait_handler_type> wait_handler;
+ co_waiter<boost::system::error_code, executor_type> waiter;
// return an awaitable that completes once count <= target_count
auto wait_for(size_type target_count)
-> awaitable<boost::system::error_code>
{
- ceph_assert(!wait_handler); // one waiter at a time
wait_for_count = target_count;
-
- use_awaitable_t token;
- return boost::asio::async_initiate<use_awaitable_t, wait_signature>(
- [this] (wait_handler_type h) {
- wait_handler.emplace(std::move(h));
- }, token);
+ return waiter.get();
}
void on_complete(child& c, std::exception_ptr eptr,
}
// maybe wake the waiter
- if (wait_handler && count <= wait_for_count) {
- wait_complete({});
+ if (waiter.waiting() && count <= wait_for_count) {
+ auto eptr = std::exchange(unreported_exception, nullptr);
+ waiter.complete(eptr, {});
}
}
- void wait_complete(boost::system::error_code ec)
- {
- // bind arguments to the handler for dispatch
- auto eptr = std::exchange(unreported_exception, nullptr);
- auto c = boost::asio::append(std::move(*wait_handler), eptr, ec);
- wait_handler.reset();
-
- boost::asio::dispatch(std::move(c));
- }
-
struct child_completion {
boost::intrusive_ptr<co_throttle_impl> impl;
child& c;