From: Casey Bodley Date: Mon, 30 Jan 2023 01:30:30 +0000 (-0500) Subject: common/async: add co_waiter class template X-Git-Tag: v20.0.0~1421^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=26ee0696a61774d95980062dd0b09bc578dd4a45;p=ceph.git common/async: add co_waiter class template Signed-off-by: Casey Bodley --- diff --git a/src/common/async/co_waiter.h b/src/common/async/co_waiter.h new file mode 100644 index 000000000000..098ff1f26b67 --- /dev/null +++ b/src/common/async/co_waiter.h @@ -0,0 +1,166 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "include/ceph_assert.h" + +namespace ceph::async { + +/// Captures an awaitable handler for deferred completion or cancellation. +template +class co_waiter { + using signature = void(std::exception_ptr, Ret); + using token_type = boost::asio::use_awaitable_t; + using handler_type = typename boost::asio::async_result< + token_type, signature>::handler_type; + std::optional handler; + + struct op_cancellation { + co_waiter* self; + op_cancellation(co_waiter* self) : self(self) {} + void operator()(boost::asio::cancellation_type_t type) { + if (type != boost::asio::cancellation_type::none) { + self->cancel(); + } + } + }; + public: + co_waiter() = default; + + // copy and move are disabled because the cancellation handler captures 'this' + co_waiter(const co_waiter&) = delete; + co_waiter& operator=(const co_waiter&) = delete; + + /// Returns true if there's a handler awaiting completion. + bool waiting() const { return handler.has_value(); } + + /// Returns an awaitable that blocks until complete() or cancel(). + boost::asio::awaitable get() + { + ceph_assert(!handler); + token_type token; + return boost::asio::async_initiate( + [this] (handler_type h) { + auto slot = boost::asio::get_associated_cancellation_slot(h); + if (slot.is_connected()) { + slot.template emplace(this); + } + handler.emplace(std::move(h)); + }, token); + } + + /// Schedule the completion handler with the given arguments. + void complete(std::exception_ptr eptr, Ret value) + { + ceph_assert(handler); + auto h = boost::asio::append(std::move(*handler), eptr, std::move(value)); + handler.reset(); + boost::asio::dispatch(std::move(h)); + } + + /// Cancel the coroutine with an operation_aborted exception. + void cancel() + { + if (handler) { + auto eptr = std::make_exception_ptr( + boost::system::system_error( + boost::asio::error::operation_aborted)); + complete(eptr, Ret{}); + } + } + + /// Destroy the completion handler. + void shutdown() + { + handler.reset(); + } +}; + +// specialization for Ret=void +template +class co_waiter { + using signature = void(std::exception_ptr); + using token_type = boost::asio::use_awaitable_t; + using handler_type = typename boost::asio::async_result< + token_type, signature>::handler_type; + std::optional handler; + + struct op_cancellation { + co_waiter* self; + op_cancellation(co_waiter* self) : self(self) {} + void operator()(boost::asio::cancellation_type_t type) { + if (type != boost::asio::cancellation_type::none) { + self->cancel(); + } + } + }; + public: + co_waiter() = default; + + // copy and move are disabled because the cancellation handler captures 'this' + co_waiter(const co_waiter&) = delete; + co_waiter& operator=(const co_waiter&) = delete; + + /// Returns true if there's a handler awaiting completion. + bool waiting() const { return handler.has_value(); } + + /// Returns an awaitable that blocks until complete() or cancel(). + boost::asio::awaitable get() + { + ceph_assert(!handler); + token_type token; + return boost::asio::async_initiate( + [this] (handler_type h) { + auto slot = boost::asio::get_associated_cancellation_slot(h); + if (slot.is_connected()) { + slot.template emplace(this); + } + handler.emplace(std::move(h)); + }, token); + } + + /// Schedule the completion handler with the given arguments. + void complete(std::exception_ptr eptr) + { + ceph_assert(handler); + auto h = boost::asio::append(std::move(*handler), eptr); + handler.reset(); + boost::asio::dispatch(std::move(h)); + } + + /// Cancel the coroutine with an operation_aborted exception. + void cancel() + { + if (handler) { + auto eptr = std::make_exception_ptr( + boost::system::system_error( + boost::asio::error::operation_aborted)); + complete(eptr); + } + } + + /// Destroy the completion handler. + void shutdown() + { + handler.reset(); + } +}; + +} // namespace ceph::async