From: Adam C. Emerson Date: Tue, 23 May 2023 00:15:05 +0000 (-0400) Subject: common/async: Add async_{dispatch,post,defer} X-Git-Tag: testing/wip-vshankar-testing-20250407.170244-debug~16^2~41 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=de7a223759b3fbaede0235a3c1fc5993838820af;p=ceph-ci.git common/async: Add async_{dispatch,post,defer} These functions let you run functions on arbitrary executors, providing a convenient way to mix I/O less critical sections on a strand in a coroutine. Signed-off-by: Adam C. Emerson --- diff --git a/src/common/async/async_call.h b/src/common/async/async_call.h new file mode 100644 index 00000000000..d118070fea6 --- /dev/null +++ b/src/common/async/async_call.h @@ -0,0 +1,240 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +/// \file common/async/async_call.h +/// +/// \brief Wait for functions to run on arbitrary executors + +#include +#include + +#include +#include +#include +#include + +namespace ceph::async { +/// \brief Dispatch a function on another executor and wait for it to +/// finish +/// +/// Useful with strands and similar +/// +/// \param executor The executor on which to call the function +/// \param f The function to call +/// \param token Boost.Asio completion token +/// \param ioe Parameter pack of executors and I/O Objects to keep live +/// +/// \return The return value of `f` in a way appropriate to the +/// completion token. See Boost.Asio documentation. +template F, + boost::asio::completion_token_for< + void(std::invoke_result_t)> CompletionToken, + typename ...IOE> +auto async_dispatch(boost::asio::execution::executor auto executor, + F&& f, CompletionToken&& token, IOE&& ...ioe) +{ + namespace asio = boost::asio; + return asio::async_compose)>( + [executor, f = std::move(f)](auto &self) mutable { + auto ex = executor; + asio::dispatch(ex, [f = std::move(f), + self = std::move(self)]() mutable { + auto r = std::invoke(f); + auto ex2 = self.get_executor(); + asio::dispatch(ex2, [r = std::move(r), + self = std::move(self)]() mutable { + self.complete(std::move(r)); + }); + }); + }, + token, executor, std::forward(ioe)...); +} + +/// \brief Dispatch a function on another executor and wait for it to +/// finish +/// +/// Useful with strands and similar +/// +/// \param executor The executor on which to call the function +/// \param f The function to call +/// \param token Boost.Asio completion token +/// \param ioe Parameter pack of executors and I/O Objects to keep live +/// +/// \return The return value of `f` in a way appropriate to the +/// completion token. See Boost.Asio documentation. +template F, + boost::asio::completion_token_for CompletionToken, + typename ...IOE> +auto async_dispatch(boost::asio::execution::executor auto executor, + F&& f, CompletionToken&& token, IOE&& ...ioe) + requires std::is_void_v> +{ + namespace asio = boost::asio; + return asio::async_compose< + CompletionToken, void()>( + [executor, f = std::move(f)] (auto& self) mutable { + auto ex = executor; + asio::dispatch(ex, [f = std::move(f), + self = std::move(self)]() mutable { + std::invoke(f); + auto ex2 = self.get_executor(); + asio::dispatch(ex2, [self = std::move(self)]() mutable { + self.complete(); + }); + }); + }, token, executor, std::forward(ioe)...); +} + +/// \brief Post a function on another executor and wait for it to +/// finish +/// +/// Useful with strands and similar +/// +/// \param executor The executor on which to call the function +/// \param f The function to call +/// \param token Boost.Asio completion token +/// \param ioe Parameter pack of executors and I/O Objects to keep live +/// +/// \return The return value of `f` in a way appropriate to the +/// completion token. See Boost.Asio documentation. +template F, + boost::asio::completion_token_for< + void(std::invoke_result_t)> CompletionToken, + typename ...IOE> +auto async_post(boost::asio::execution::executor auto executor, + F&& f, CompletionToken&& token, IOE&& ...ioe) +{ + namespace asio = boost::asio; + return asio::async_compose< + CompletionToken, + void(std::invoke_result_t)>( + [executor, f = std::move(f)] (auto& self) mutable { + auto ex = executor; + asio::post(ex, [f = std::move(f), self = std::move(self)]() mutable { + auto r = std::invoke(f); + auto ex2 = self.get_executor(); + asio::dispatch(ex2, [self = std::move(self), + r = std::move(r)]() mutable { + self.complete(std::move(r)); + }); + }); + }, token, executor, std::forward(ioe)...); +} + +/// \brief Post a function on another executor and wait for it to +/// finish +/// +/// Useful with strands and similar +/// +/// \param executor The executor on which to call the function +/// \param f The function to call +/// \param token Boost.Asio completion token +/// \param ioe Parameter pack of executors and I/O Objects to keep live +/// +/// \return The return value of `f` in a way appropriate to the +/// completion token. See Boost.Asio documentation. +template F, + boost::asio::completion_token_for CompletionToken, + typename ...IOE> +auto async_post(boost::asio::execution::executor auto executor, + F&& f, CompletionToken&& token, IOE&& ...ioe) + requires std::is_void_v> +{ + namespace asio = boost::asio; + return asio::async_compose< + CompletionToken, void()>( + [executor, f = std::move(f)] (auto& self) mutable { + auto ex = executor; + asio::post(ex, [f = std::move(f), self = std::move(self)]() mutable { + std::invoke(f); + auto ex2 = self.get_executor(); + asio::dispatch(ex2, [self = std::move(self)]() mutable { + self.complete(); + }); + }); + }, token, executor, std::forward(ioe)...); +} + +/// \brief Defer a function on another executor and wait for it to +/// finish +/// +/// Useful with strands and similar +/// +/// \param executor The executor on which to call the function +/// \param f The function to call +/// \param token Boost.Asio completion token +/// \param ioe Parameter pack of executors and I/O Objects to keep live +/// +/// \return The return value of `f` in a way appropriate to the +/// completion token. See Boost.Asio documentation. +template F, + boost::asio::completion_token_for< + void(std::invoke_result_t)> CompletionToken, + typename ...IOE> +auto async_defer(boost::asio::execution::executor auto executor, + F&& f, CompletionToken&& token, IOE&& ...ioe) +{ + namespace asio = boost::asio; + return asio::async_compose< + CompletionToken, + void(std::invoke_result_t)>( + [executor, f = std::move(f)] (auto& self) mutable { + auto ex = executor; + asio::defer(ex, [f = std::move(f), self = std::move(self)]() mutable { + auto r = std::invoke(f); + auto ex2 = self.get_executor(); + asio::dispatch(ex2, [r = std::move(r), + self = std::move(self)]() mutable { + self.complete(std::move(r)); + }); + }); + }, token, executor, std::forward(ioe)...); +} + +/// \brief Defer a function on another executor and wait for it to +/// finish +/// +/// Useful with strands and similar +/// +/// \param executor The executor on which to call the function +/// \param f The function to call +/// \param token Boost.Asio completion token +/// \param ioe Parameter pack of executors and I/O Objects to keep live +/// +/// \return The return value of `f` in a way appropriate to the +/// completion token. See Boost.Asio documentation. +template F, + boost::asio::completion_token_for CompletionToken, + typename ...IOE> +auto async_defer(boost::asio::execution::executor auto executor, F&& f, + CompletionToken&& token, IOE&& ...ioe) + requires std::is_void_v> +{ + namespace asio = boost::asio; + return asio::async_compose< + CompletionToken, void()>( + [executor, f = std::move(f)] (auto& self) mutable { + auto ex = executor; + asio::defer(ex, [f = std::move(f), self = std::move(self)]() mutable { + std::invoke(f); + auto ex2 = self.get_executor(); + asio::dispatch(ex2, [self = std::move(self)]() mutable { + self.complete(); + }); + }); + }, token, executor, std::forward(ioe)...); +} +} diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 34f041e61de..1402fa24fd3 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -482,3 +482,8 @@ add_test(NAME unittest_decode_start_v_checker_expect_failure WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_tests_properties(unittest_decode_start_v_checker_expect_failure PROPERTIES WILL_FAIL TRUE) + +add_executable(unittest_async_call test_async_call.cc) +target_link_libraries(unittest_async_call librados ceph-common Boost::system + GTest::GTest) +add_ceph_unittest(unittest_async_call) diff --git a/src/test/common/test_async_call.cc b/src/test/common/test_async_call.cc new file mode 100644 index 00000000000..ceb6aa84c55 --- /dev/null +++ b/src/test/common/test_async_call.cc @@ -0,0 +1,267 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/async_call.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +namespace asio = boost::asio; +namespace chrono = std::chrono; +namespace sys = boost::system; +using namespace std::literals; + +using ceph::async::async_dispatch; +using ceph::async::async_post; +using ceph::async::async_defer; + +inline constexpr auto dur = 50ms; + +template +asio::awaitable> +wait_for(chrono::duration dur) +{ + asio::steady_timer timer{co_await asio::this_coro::executor, dur}; + co_await timer.async_wait(asio::use_awaitable); + co_return dur; +} + +TEST(AsyncDispatch, AsyncCall) +{ + asio::io_context c; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + auto x = co_await async_dispatch( + strand, [] {return 55;}, asio::use_awaitable); + EXPECT_EQ(x, 55); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); +} + +TEST(AsyncPost, AsyncCall) +{ + asio::io_context c; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + auto x = co_await async_post( + strand, [] {return 55;}, asio::use_awaitable); + EXPECT_EQ(x, 55); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); +} + +TEST(AsyncDefer, AsyncCall) +{ + asio::io_context c; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + auto x = co_await async_defer( + strand, [] {return 55;}, asio::use_awaitable); + EXPECT_EQ(x, 55); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); +} + +TEST(AsyncDispatchVoid, AsyncCall) +{ + asio::io_context c; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + co_await async_dispatch( + strand, [] {return;}, asio::use_awaitable); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); +} + +TEST(AsyncPostVoid, AsyncCall) +{ + asio::io_context c; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + co_await async_post( + strand, [] {return;}, asio::use_awaitable); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); +} + +TEST(AsyncDeferVoid, AsyncCall) +{ + asio::io_context c; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + co_await async_defer( + strand, [] {return;}, asio::use_awaitable); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); +} + +TEST(AsyncDispatchDeferred, AsyncCall) +{ + asio::io_context c; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + bool ran = false; + auto op = async_dispatch(strand, [&ran] { + ran = true; + return 55; + }, asio::deferred); + EXPECT_FALSE(ran); + std::move(op)([](int x) { + EXPECT_EQ(x, 55); + }); + EXPECT_TRUE(ran); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); +} + +TEST(AsyncDispatchDeferredVoid, AsyncCall) +{ + asio::io_context c; + bool ran = false; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + auto op = async_dispatch(strand, [&ran] { + ran = true; + return; + }, asio::deferred); + EXPECT_FALSE(ran); + std::move(op)([]() {}); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); + EXPECT_TRUE(ran); +} + +TEST(AsyncPostDeferred, AsyncCall) +{ + asio::io_context c; + bool ran = false; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + auto op = async_post(strand, [&ran] { + ran = true; + return 55; + }, asio::deferred); + EXPECT_FALSE(ran); + std::move(op)([](int x) { + EXPECT_EQ(x, 55); + }); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); + EXPECT_TRUE(ran); +} + +TEST(AsyncPostDeferredVoid, AsyncCall) +{ + asio::io_context c; + bool ran = false; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + auto op = async_post(strand, [&ran] { + ran = true; + return; + }, asio::deferred); + EXPECT_FALSE(ran); + std::move(op)([]() {}); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); + EXPECT_TRUE(ran); +} + +TEST(AsyncDeferDeferred, AsyncCall) +{ + asio::io_context c; + bool ran = false; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + auto op = async_defer(strand, [&ran] { + ran = true; + return 55; + }, asio::deferred); + EXPECT_FALSE(ran); + std::move(op)([](int x) { + EXPECT_EQ(x, 55); + }); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); + EXPECT_TRUE(ran); +} + +TEST(AsyncDeferDeferredVoid, AsyncCall) +{ + asio::io_context c; + bool ran = false; + asio::co_spawn(c, [&]() -> asio::awaitable { + auto strand = asio::make_strand(c.get_executor()); + auto op = async_defer(strand, [&ran] { + ran = true; + return; + }, asio::deferred); + EXPECT_FALSE(ran); + std::move(op)([]() {}); + co_return; + }, [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + c.run(); + EXPECT_TRUE(ran); +}