--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+/// \file common/async/async_call.h
+///
+/// \brief Wait for functions to run on arbitrary executors
+
+#include <concepts>
+#include <type_traits>
+
+#include <boost/asio/compose.hpp>
+#include <boost/asio/defer.hpp>
+#include <boost/asio/dispatch.hpp>
+#include <boost/asio/post.hpp>
+
+namespace ceph::async {
+/// \brief Dispatch a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+ boost::asio::completion_token_for<
+ void(std::invoke_result_t<F>)> CompletionToken,
+ typename ...IOE>
+auto async_dispatch(boost::asio::execution::executor auto executor,
+ F&& f, CompletionToken&& token, IOE&& ...ioe)
+{
+ namespace asio = boost::asio;
+ return asio::async_compose<CompletionToken, void(std::invoke_result_t<F>)>(
+ [executor, f = std::move(f)](auto &self) mutable {
+ auto ex = executor;
+ asio::dispatch(ex, [f = std::move(f),
+ self = std::move(self)]() mutable {
+ auto r = std::invoke(f);
+ auto ex2 = self.get_executor();
+ asio::dispatch(ex2, [r = std::move(r),
+ self = std::move(self)]() mutable {
+ self.complete(std::move(r));
+ });
+ });
+ },
+ token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Dispatch a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+ boost::asio::completion_token_for<void()> CompletionToken,
+ typename ...IOE>
+auto async_dispatch(boost::asio::execution::executor auto executor,
+ F&& f, CompletionToken&& token, IOE&& ...ioe)
+ requires std::is_void_v<std::invoke_result_t<F>>
+{
+ namespace asio = boost::asio;
+ return asio::async_compose<
+ CompletionToken, void()>(
+ [executor, f = std::move(f)] (auto& self) mutable {
+ auto ex = executor;
+ asio::dispatch(ex, [f = std::move(f),
+ self = std::move(self)]() mutable {
+ std::invoke(f);
+ auto ex2 = self.get_executor();
+ asio::dispatch(ex2, [self = std::move(self)]() mutable {
+ self.complete();
+ });
+ });
+ }, token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Post a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+ boost::asio::completion_token_for<
+ void(std::invoke_result_t<F>)> CompletionToken,
+ typename ...IOE>
+auto async_post(boost::asio::execution::executor auto executor,
+ F&& f, CompletionToken&& token, IOE&& ...ioe)
+{
+ namespace asio = boost::asio;
+ return asio::async_compose<
+ CompletionToken,
+ void(std::invoke_result_t<F>)>(
+ [executor, f = std::move(f)] (auto& self) mutable {
+ auto ex = executor;
+ asio::post(ex, [f = std::move(f), self = std::move(self)]() mutable {
+ auto r = std::invoke(f);
+ auto ex2 = self.get_executor();
+ asio::dispatch(ex2, [self = std::move(self),
+ r = std::move(r)]() mutable {
+ self.complete(std::move(r));
+ });
+ });
+ }, token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Post a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+ boost::asio::completion_token_for<void()> CompletionToken,
+ typename ...IOE>
+auto async_post(boost::asio::execution::executor auto executor,
+ F&& f, CompletionToken&& token, IOE&& ...ioe)
+ requires std::is_void_v<std::invoke_result_t<F>>
+{
+ namespace asio = boost::asio;
+ return asio::async_compose<
+ CompletionToken, void()>(
+ [executor, f = std::move(f)] (auto& self) mutable {
+ auto ex = executor;
+ asio::post(ex, [f = std::move(f), self = std::move(self)]() mutable {
+ std::invoke(f);
+ auto ex2 = self.get_executor();
+ asio::dispatch(ex2, [self = std::move(self)]() mutable {
+ self.complete();
+ });
+ });
+ }, token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Defer a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+ boost::asio::completion_token_for<
+ void(std::invoke_result_t<F>)> CompletionToken,
+ typename ...IOE>
+auto async_defer(boost::asio::execution::executor auto executor,
+ F&& f, CompletionToken&& token, IOE&& ...ioe)
+{
+ namespace asio = boost::asio;
+ return asio::async_compose<
+ CompletionToken,
+ void(std::invoke_result_t<F>)>(
+ [executor, f = std::move(f)] (auto& self) mutable {
+ auto ex = executor;
+ asio::defer(ex, [f = std::move(f), self = std::move(self)]() mutable {
+ auto r = std::invoke(f);
+ auto ex2 = self.get_executor();
+ asio::dispatch(ex2, [r = std::move(r),
+ self = std::move(self)]() mutable {
+ self.complete(std::move(r));
+ });
+ });
+ }, token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Defer a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+ boost::asio::completion_token_for<void()> CompletionToken,
+ typename ...IOE>
+auto async_defer(boost::asio::execution::executor auto executor, F&& f,
+ CompletionToken&& token, IOE&& ...ioe)
+ requires std::is_void_v<std::invoke_result_t<F>>
+{
+ namespace asio = boost::asio;
+ return asio::async_compose<
+ CompletionToken, void()>(
+ [executor, f = std::move(f)] (auto& self) mutable {
+ auto ex = executor;
+ asio::defer(ex, [f = std::move(f), self = std::move(self)]() mutable {
+ std::invoke(f);
+ auto ex2 = self.get_executor();
+ asio::dispatch(ex2, [self = std::move(self)]() mutable {
+ self.complete();
+ });
+ });
+ }, token, executor, std::forward<IOE>(ioe)...);
+}
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/async_call.h"
+
+#include <cerrno>
+#include <chrono>
+#include <functional>
+#include <exception>
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/deferred.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <boost/asio/use_awaitable.hpp>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/errc.hpp>
+#include <boost/system/system_error.hpp>
+
+#include <gtest/gtest.h>
+
+namespace asio = boost::asio;
+namespace chrono = std::chrono;
+namespace sys = boost::system;
+using namespace std::literals;
+
+using ceph::async::async_dispatch;
+using ceph::async::async_post;
+using ceph::async::async_defer;
+
+inline constexpr auto dur = 50ms;
+
+template<typename Rep, typename Period>
+asio::awaitable<chrono::duration<Rep, Period>>
+wait_for(chrono::duration<Rep, Period> dur)
+{
+ asio::steady_timer timer{co_await asio::this_coro::executor, dur};
+ co_await timer.async_wait(asio::use_awaitable);
+ co_return dur;
+}
+
+TEST(AsyncDispatch, AsyncCall)
+{
+ asio::io_context c;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ auto x = co_await async_dispatch(
+ strand, [] {return 55;}, asio::use_awaitable);
+ EXPECT_EQ(x, 55);
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+}
+
+TEST(AsyncPost, AsyncCall)
+{
+ asio::io_context c;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ auto x = co_await async_post(
+ strand, [] {return 55;}, asio::use_awaitable);
+ EXPECT_EQ(x, 55);
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+}
+
+TEST(AsyncDefer, AsyncCall)
+{
+ asio::io_context c;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ auto x = co_await async_defer(
+ strand, [] {return 55;}, asio::use_awaitable);
+ EXPECT_EQ(x, 55);
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+}
+
+TEST(AsyncDispatchVoid, AsyncCall)
+{
+ asio::io_context c;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ co_await async_dispatch(
+ strand, [] {return;}, asio::use_awaitable);
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+}
+
+TEST(AsyncPostVoid, AsyncCall)
+{
+ asio::io_context c;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ co_await async_post(
+ strand, [] {return;}, asio::use_awaitable);
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+}
+
+TEST(AsyncDeferVoid, AsyncCall)
+{
+ asio::io_context c;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ co_await async_defer(
+ strand, [] {return;}, asio::use_awaitable);
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+}
+
+TEST(AsyncDispatchDeferred, AsyncCall)
+{
+ asio::io_context c;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ bool ran = false;
+ auto op = async_dispatch(strand, [&ran] {
+ ran = true;
+ return 55;
+ }, asio::deferred);
+ EXPECT_FALSE(ran);
+ std::move(op)([](int x) {
+ EXPECT_EQ(x, 55);
+ });
+ EXPECT_TRUE(ran);
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+}
+
+TEST(AsyncDispatchDeferredVoid, AsyncCall)
+{
+ asio::io_context c;
+ bool ran = false;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ auto op = async_dispatch(strand, [&ran] {
+ ran = true;
+ return;
+ }, asio::deferred);
+ EXPECT_FALSE(ran);
+ std::move(op)([]() {});
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+ EXPECT_TRUE(ran);
+}
+
+TEST(AsyncPostDeferred, AsyncCall)
+{
+ asio::io_context c;
+ bool ran = false;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ auto op = async_post(strand, [&ran] {
+ ran = true;
+ return 55;
+ }, asio::deferred);
+ EXPECT_FALSE(ran);
+ std::move(op)([](int x) {
+ EXPECT_EQ(x, 55);
+ });
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+ EXPECT_TRUE(ran);
+}
+
+TEST(AsyncPostDeferredVoid, AsyncCall)
+{
+ asio::io_context c;
+ bool ran = false;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ auto op = async_post(strand, [&ran] {
+ ran = true;
+ return;
+ }, asio::deferred);
+ EXPECT_FALSE(ran);
+ std::move(op)([]() {});
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+ EXPECT_TRUE(ran);
+}
+
+TEST(AsyncDeferDeferred, AsyncCall)
+{
+ asio::io_context c;
+ bool ran = false;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ auto op = async_defer(strand, [&ran] {
+ ran = true;
+ return 55;
+ }, asio::deferred);
+ EXPECT_FALSE(ran);
+ std::move(op)([](int x) {
+ EXPECT_EQ(x, 55);
+ });
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+ EXPECT_TRUE(ran);
+}
+
+TEST(AsyncDeferDeferredVoid, AsyncCall)
+{
+ asio::io_context c;
+ bool ran = false;
+ asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+ auto strand = asio::make_strand(c.get_executor());
+ auto op = async_defer(strand, [&ran] {
+ ran = true;
+ return;
+ }, asio::deferred);
+ EXPECT_FALSE(ran);
+ std::move(op)([]() {});
+ co_return;
+ }, [](std::exception_ptr e) {
+ if (e) std::rethrow_exception(e);
+ });
+ c.run();
+ EXPECT_TRUE(ran);
+}