From b0f2bb760cc81324ab1ea209dff8f008fe320bc5 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Thu, 14 Aug 2025 21:37:07 -0400 Subject: [PATCH] common/async: Add `redirect_error.h` Straightforwardly update `redirect_error` to work with Asio's `disposition` concept generally. No point in sending upstream because Asio never accepts contributions. Signed-off-by: Adam C. Emerson --- COPYING | 6 + src/common/async/redirect_error.h | 344 +++++++++++++++++++ src/test/common/CMakeLists.txt | 3 + src/test/common/test_async_redirect_error.cc | 276 +++++++++++++++ 4 files changed, 629 insertions(+) create mode 100644 src/common/async/redirect_error.h create mode 100644 src/test/common/test_async_redirect_error.cc diff --git a/COPYING b/COPYING index 3b758d58155..52394f5bcb5 100644 --- a/COPYING +++ b/COPYING @@ -211,6 +211,12 @@ Copyright: 2020 Red Hat 2003-2019 Christopher M. Kohlhoff License: Boost Software License, Version 1.0 +Files: src/common/async/redirect_error.h, + src/test/common/test_async_redirect_error.cc +Copyright: 2025 Contributors to the Ceph Project + 2003-2024 Christopher M. Kohlhoff +License: Boost Software License, Version 1.0 + Files: src/script/backport-create-issue Copyright: 2015 Red Hat 2018 SUSE LLC diff --git a/src/common/async/redirect_error.h b/src/common/async/redirect_error.h new file mode 100644 index 00000000000..e55ee49e550 --- /dev/null +++ b/src/common/async/redirect_error.h @@ -0,0 +1,344 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp +/* + * Ceph - scalable distributed file system + */ + +// Copyright: 2025 Contributors to the Ceph Project +// Based on boost/asio/redirect_error.hpp and +// boost/asio/impl/redirect_error.hpp which are +// Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#include +#include +#include +#include +#include + +/// \file common/async/redirect_error.h +/// +/// \brief `redirect_error` that knows about `dispositions`. +/// +/// Asio has a very useful concept called a `disposition` that +/// generalizes the notion of an error code. Unfortunately +/// `redirect_error` doesn't know about dispositions, making it less +/// useful. + +namespace ceph::async { + +/// A @ref completion_token adapter used to specify that an error +/// produced by an asynchronous operation is captured to a specified +/// variable. The variable must be of a `disposition` type. +/** + * The redirect_error_t class is used to indicate that any disposition produced + * by an asynchronous operation is captured to a specified variable. + */ +template +class redirect_error_t { +public: // Sigh + CompletionToken token; + Disposition& disposition; + + template + redirect_error_t(CT&& token, Disposition& disposition) + : token(std::forward(token)), disposition(disposition) {} +}; + +/// A function object type that adapts a @ref completion_token to capture +/// disposition values to a variable. +/** + * May also be used directly as a completion token, in which case it adapts the + * asynchronous operation's default completion token (or boost::asio::deferred + * if no default is available). + */ +template +class partial_redirect_error { +public: + Disposition& disposition; + + /// Constructor that specifies the variable used to capture disposition values. + explicit partial_redirect_error(Disposition& disposition) + : disposition(disposition) {} + + /// Adapt a @ref completion_token to specify that the completion handler + /// should capture disposition values to a variable. + template + [[nodiscard]] constexpr inline auto + operator ()(CompletionToken&& token) const { + return redirect_error_t, Disposition>{ + std::forward(token), disposition}; + } +}; + +/// Create a partial completion token adapter that captures disposition values +/// to a variable. +template +[[nodiscard]] inline auto redirect_error(Disposition& d) +{ + return partial_redirect_error>{d}; +} + +/// Adapt a @ref completion_token to capture disposition values to a variable. +template +[[nodiscard]] inline auto redirect_error(CompletionToken&& token, + Disposition& d) +{ + return redirect_error_t, + std::decay_t>{ + std::forward(token), d}; +} + +namespace detail { +template +class redirect_error_handler { +public: + Disposition& disposition; + // Essentially a call-once function, invoked as an rvalue. + Handler handler; + + using result_type = void; + template + redirect_error_handler( + redirect_error_t, + std::decay_t> re) + : disposition(re.disposition), handler(std::move(re.token)) {} + + template + redirect_error_handler(Disposition &disposition, RedirectedHandler&& handler) + : disposition(disposition), + handler(std::forward(handler)) {} + + + void operator ()() { + std::move(handler)(); + } + + template + std::enable_if_t, + Disposition>> + operator ()(Arg0&& arg0, Args ...args) { + std::move(handler)(std::forward(arg0), std::forward(args)...); + } + + template + void operator ()(const Disposition& d, Args&& ...args) { + disposition = d; + std::move(handler)(std::forward(args)...); + } +}; + +template +inline bool asio_handler_is_continuation( + redirect_error_handler* this_handler) +{ + using boost::asio::asio_handler_is_continuation; + return asio_handler_is_continuation(&this_handler->handler); +} + +template +struct redirect_error_signature +{ + using type = Signature; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...); +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...); +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) &; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) &; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) &&; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) &&; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) & noexcept; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) & noexcept; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) & noexcept; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) & noexcept; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) && noexcept; +}; + +template +struct redirect_error_signature +{ + typedef R type(Args...) && noexcept; +}; + +template +class initiation_base : public Initiation +{ +public: + template + explicit initiation_base(I&& initiation) + : Initiation(std::forward(initiation)) {} +}; + +template +class initiation_base>> +{ +public: + template + explicit initiation_base(I&& initiation) + : initiation(std::forward(initiation)) {} + + template + void operator()(Args&&... args) const + { + initiation(std::forward(args)...); + } + +private: + Initiation initiation; +}; +} // namespace detail +} // namespace ceph::async + +namespace boost::asio { + +template +struct async_result<::ceph::async::redirect_error_t, Signature> + : async_result::type> +{ + template + struct init_wrapper : ::ceph::async::detail::initiation_base + { + using ::ceph::async::detail::initiation_base::initiation_base; + + template + void operator ()(Handler&& handler, Disposition* d, Args&&... args) && + { + static_cast(*this)( + ::ceph::async::detail::redirect_error_handler< + decay_t, Disposition>( + *d, std::forward(handler)), + std::forward(args)...); + } + + template + void operator ()(Handler&& handler, Disposition* d, Args&&... args) const & + { + static_cast(*this)( + ::ceph::async::detail::redirect_error_handler< + decay_t, Disposition>( + *d, std::forward(handler)), + std::forward(args)...); + } + }; + + template + static auto initiate(Initiation&& initiation, + RawCompletionToken&& token, Args&&... args) + { + return async_initiate< + std::conditional_t< + std::is_const_v>, + const CompletionToken, CompletionToken>, + typename ::ceph::async::detail::redirect_error_signature::type>( + init_wrapper>( + std::forward(initiation)), + token.token, &token.disposition, std::forward(args)...); + } +}; + +template class Associator, typename Handler, + typename DefaultCandidate, typename Disposition> +struct associator, + DefaultCandidate> + : Associator +{ + static auto get(const ::ceph::async::detail::redirect_error_handler< + Handler, Disposition>& h) noexcept + { + return Associator::get(h.handler); + } + + static auto get(const ::ceph::async::detail::redirect_error_handler< + Handler, Disposition>& h, + const DefaultCandidate& c) noexcept + { + return Associator::get(h.handler, c); + } +}; + +template +struct async_result<::ceph::async::partial_redirect_error, + Signatures...> +{ + template + static auto initiate(Initiation&& initiation, + RawCompletionToken&& token, Args&&... args) + { + return async_initiate( + std::forward(initiation), + ::ceph::async::redirect_error_t< + default_completion_token_t>, + Disposition>( + default_completion_token_t>{}, + token.disposition), std::forward(args)...); + } +}; +} // namespace boost::asio diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index f827ce9eff3..7624abf2dee 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -506,3 +506,6 @@ add_executable(unittest_async_cond test_async_cond.cc) target_link_libraries(unittest_async_cond GTest::GTest) add_ceph_unittest(unittest_async_cond) +add_executable(unittest_async_redirect_error test_async_redirect_error.cc) +target_link_libraries(unittest_async_redirect_error GTest::GTest) +add_ceph_unittest(unittest_async_redirect_error) diff --git a/src/test/common/test_async_redirect_error.cc b/src/test/common/test_async_redirect_error.cc new file mode 100644 index 00000000000..82eef2d0ecb --- /dev/null +++ b/src/test/common/test_async_redirect_error.cc @@ -0,0 +1,276 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp +/* + * Ceph - scalable distributed file system + */ + +// Based on libs/asio/test/redirect_error.cpp which is +// Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying copy at http://www.boost.org/LICENSE_1_0.txt) + +#include "common/async/redirect_error.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +struct redirect_error_handler +{ + int* count; + + explicit redirect_error_handler(int* c) + : count(c) {} + + void operator ()() + { + ++(*count); + } +}; + +TEST(RedirectError, RedirectError) +{ + boost::asio::io_context io1; + boost::asio::io_context io2; + boost::asio::system_timer timer1(io1); + boost::system::error_code ec = boost::asio::error::would_block; + int count = 0; + + timer1.expires_after(boost::asio::chrono::seconds(0)); + timer1.async_wait( + ceph::async::redirect_error( + boost::asio::bind_executor(io2.get_executor(), + redirect_error_handler(&count)), ec)); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(0, count); + + io1.run(); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(0, count); + + io2.run(); + + ASSERT_FALSE(ec); + ASSERT_EQ(1, count); + + ec = boost::asio::error::would_block; + timer1.async_wait( + ceph::async::redirect_error( + boost::asio::bind_executor(io2.get_executor(), + boost::asio::deferred), ec))(redirect_error_handler(&count)); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(1, count); + + io1.restart(); + io1.run(); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(1, count); + + io2.restart(); + io2.run(); + + ASSERT_FALSE(ec); + ASSERT_EQ(2, count); + + ec = boost::asio::error::would_block; + std::future f = timer1.async_wait( + ceph::async::redirect_error( + boost::asio::bind_executor(io2.get_executor(), + boost::asio::use_future), ec)); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(std::future_status::timeout, f.wait_for(std::chrono::seconds(0))); + + io1.restart(); + io1.run(); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(std::future_status::timeout, f.wait_for(std::chrono::seconds(0))); + + io2.restart(); + io2.run(); + + ASSERT_FALSE(ec); + ASSERT_EQ(std::future_status::ready, f.wait_for(std::chrono::seconds(0))); +} + +TEST(RedirectError, RedirectErrorExceptionPtr) +{ + boost::asio::io_context io; + std::exception_ptr eptr = nullptr; + + int count = 0; + + boost::asio::co_spawn( + io, []() -> boost::asio::awaitable { + throw std::exception{}; + co_return; + }, + ceph::async::redirect_error( + boost::asio::bind_executor(io.get_executor(), + redirect_error_handler(&count)), eptr)); + + ASSERT_FALSE(eptr); + ASSERT_EQ(0, count); + + io.run(); + + ASSERT_TRUE(eptr); + ASSERT_EQ(1, count); + + boost::asio::co_spawn( + io, []() -> boost::asio::awaitable { + co_return; + }, + ceph::async::redirect_error( + boost::asio::bind_executor(io.get_executor(), + redirect_error_handler(&count)), eptr)); + ASSERT_TRUE(eptr); + ASSERT_EQ(1, count); + + io.restart(); + io.run(); + + ASSERT_FALSE(eptr); + ASSERT_EQ(2, count); +} + +TEST(RedirectError, PartialRedirectError) +{ + boost::asio::io_context io1; + boost::asio::io_context io2; + boost::asio::system_timer timer1(io1); + boost::system::error_code ec = boost::asio::error::would_block; + int count = 0; + + timer1.expires_after(boost::asio::chrono::seconds(0)); + timer1.async_wait(ceph::async::redirect_error(ec))( + boost::asio::bind_executor(io2.get_executor(), + redirect_error_handler(&count))); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(0, count); + + io1.run(); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(0, count); + + io2.run(); + + ASSERT_FALSE(ec); + ASSERT_EQ(1, count); + + ec = boost::asio::error::would_block; + timer1.async_wait(ceph::async::redirect_error(ec))( + boost::asio::bind_executor(io2.get_executor(), + boost::asio::deferred))(redirect_error_handler(&count)); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(1, count); + + io1.restart(); + io1.run(); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(1, count); + + io2.restart(); + io2.run(); + + ASSERT_FALSE(ec); + ASSERT_EQ(2, count); + + ec = boost::asio::error::would_block; + timer1.async_wait()(ceph::async::redirect_error(ec))( + boost::asio::bind_executor(io2.get_executor(), + boost::asio::deferred))(redirect_error_handler(&count)); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(2, count); + + io1.restart(); + io1.run(); + + ASSERT_EQ(boost::asio::error::would_block, ec); + ASSERT_EQ(2, count); + + io2.restart(); + io2.run(); + + ASSERT_FALSE(ec); + ASSERT_EQ(3, count); + + ec = boost::asio::error::would_block; + std::future f = timer1.async_wait(ceph::async::redirect_error(ec))( + boost::asio::bind_executor(io2.get_executor(), boost::asio::use_future)); + + ASSERT_EQ(boost::asio::error::would_block, ec); + EXPECT_EQ(std::future_status::timeout, f.wait_for(std::chrono::seconds(0))); + + io1.restart(); + io1.run(); + + EXPECT_EQ(boost::asio::error::would_block, ec); + EXPECT_EQ(std::future_status::timeout, f.wait_for(std::chrono::seconds(0))); + + io2.restart(); + io2.run(); + + EXPECT_FALSE(ec); + EXPECT_EQ(std::future_status::ready, f.wait_for(std::chrono::seconds(0))); +} + +TEST(RedirectError, PartialRedirectErrorExceptionPtr) +{ + boost::asio::io_context io; + std::exception_ptr eptr = nullptr; + + int count = 0; + + boost::asio::co_spawn( + io, []() -> boost::asio::awaitable { + throw std::exception{}; + co_return; + }, + ceph::async::redirect_error(eptr)(boost::asio::bind_executor( + io.get_executor(), + redirect_error_handler(&count)))); + + ASSERT_FALSE(eptr); + ASSERT_EQ(0, count); + + io.run(); + + ASSERT_TRUE(eptr); + ASSERT_EQ(1, count); + + boost::asio::co_spawn( + io, []() -> boost::asio::awaitable { + co_return; + }, + ceph::async::redirect_error(eptr)(boost::asio::bind_executor( + io.get_executor(), + redirect_error_handler(&count)))); + ASSERT_TRUE(eptr); + ASSERT_EQ(1, count); + + io.restart(); + io.run(); + + ASSERT_FALSE(eptr); + ASSERT_EQ(2, count); +} -- 2.39.5