]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: Add async_{dispatch,post,defer}
authorAdam C. Emerson <aemerson@redhat.com>
Tue, 23 May 2023 00:15:05 +0000 (20:15 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:13 +0000 (11:10 -0400)
These functions let you run functions on arbitrary executors,
providing a convenient way to mix I/O less critical sections on a
strand in a coroutine.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/common/async/async_call.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_call.cc [new file with mode: 0644]

diff --git a/src/common/async/async_call.h b/src/common/async/async_call.h
new file mode 100644 (file)
index 0000000..d118070
--- /dev/null
@@ -0,0 +1,240 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+/// \file common/async/async_call.h
+///
+/// \brief Wait for functions to run on arbitrary executors
+
+#include <concepts>
+#include <type_traits>
+
+#include <boost/asio/compose.hpp>
+#include <boost/asio/defer.hpp>
+#include <boost/asio/dispatch.hpp>
+#include <boost/asio/post.hpp>
+
+namespace ceph::async {
+/// \brief Dispatch a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+        boost::asio::completion_token_for<
+          void(std::invoke_result_t<F>)> CompletionToken,
+        typename ...IOE>
+auto async_dispatch(boost::asio::execution::executor auto executor,
+                   F&& f, CompletionToken&& token, IOE&& ...ioe)
+{
+  namespace asio = boost::asio;
+  return asio::async_compose<CompletionToken, void(std::invoke_result_t<F>)>(
+      [executor, f = std::move(f)](auto &self) mutable {
+       auto ex = executor;
+       asio::dispatch(ex, [f = std::move(f),
+                           self = std::move(self)]() mutable {
+         auto r = std::invoke(f);
+         auto ex2 = self.get_executor();
+         asio::dispatch(ex2, [r = std::move(r),
+                              self = std::move(self)]() mutable {
+           self.complete(std::move(r));
+         });
+        });
+      },
+      token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Dispatch a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+        boost::asio::completion_token_for<void()> CompletionToken,
+        typename ...IOE>
+auto async_dispatch(boost::asio::execution::executor auto executor,
+                   F&& f, CompletionToken&& token, IOE&& ...ioe)
+  requires std::is_void_v<std::invoke_result_t<F>>
+{
+  namespace asio = boost::asio;
+  return asio::async_compose<
+    CompletionToken, void()>(
+      [executor, f = std::move(f)] (auto& self) mutable {
+       auto ex = executor;
+       asio::dispatch(ex, [f = std::move(f),
+                           self = std::move(self)]() mutable {
+         std::invoke(f);
+         auto ex2 = self.get_executor();
+         asio::dispatch(ex2, [self = std::move(self)]() mutable {
+           self.complete();
+         });
+       });
+      }, token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Post a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+        boost::asio::completion_token_for<
+          void(std::invoke_result_t<F>)> CompletionToken,
+        typename ...IOE>
+auto async_post(boost::asio::execution::executor auto executor,
+               F&& f, CompletionToken&& token, IOE&& ...ioe)
+{
+  namespace asio = boost::asio;
+  return asio::async_compose<
+    CompletionToken,
+    void(std::invoke_result_t<F>)>(
+      [executor, f = std::move(f)] (auto& self) mutable {
+       auto ex = executor;
+       asio::post(ex, [f = std::move(f), self = std::move(self)]() mutable {
+         auto r = std::invoke(f);
+         auto ex2 = self.get_executor();
+         asio::dispatch(ex2, [self = std::move(self),
+                              r = std::move(r)]() mutable {
+           self.complete(std::move(r));
+         });
+       });
+      }, token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Post a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+        boost::asio::completion_token_for<void()> CompletionToken,
+        typename ...IOE>
+auto async_post(boost::asio::execution::executor auto executor,
+               F&& f, CompletionToken&& token, IOE&& ...ioe)
+  requires std::is_void_v<std::invoke_result_t<F>>
+{
+  namespace asio = boost::asio;
+  return asio::async_compose<
+    CompletionToken, void()>(
+      [executor, f = std::move(f)] (auto& self) mutable {
+       auto ex = executor;
+       asio::post(ex, [f = std::move(f), self = std::move(self)]() mutable {
+         std::invoke(f);
+         auto ex2 = self.get_executor();
+         asio::dispatch(ex2, [self = std::move(self)]() mutable {
+           self.complete();
+         });
+       });
+      }, token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Defer a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+        boost::asio::completion_token_for<
+          void(std::invoke_result_t<F>)> CompletionToken,
+        typename ...IOE>
+auto async_defer(boost::asio::execution::executor auto executor,
+                F&& f, CompletionToken&& token, IOE&& ...ioe)
+{
+  namespace asio = boost::asio;
+  return asio::async_compose<
+    CompletionToken,
+    void(std::invoke_result_t<F>)>(
+      [executor, f = std::move(f)] (auto& self) mutable {
+       auto ex = executor;
+       asio::defer(ex, [f = std::move(f), self = std::move(self)]() mutable {
+         auto r = std::invoke(f);
+         auto ex2 = self.get_executor();
+         asio::dispatch(ex2, [r = std::move(r),
+                              self = std::move(self)]() mutable {
+           self.complete(std::move(r));
+         });
+       });
+      }, token, executor, std::forward<IOE>(ioe)...);
+}
+
+/// \brief Defer a function on another executor and wait for it to
+/// finish
+///
+/// Useful with strands and similar
+///
+/// \param executor The executor on which to call the function
+/// \param f The function to call
+/// \param token Boost.Asio completion token
+/// \param ioe Parameter pack of executors and I/O Objects to keep live
+///
+/// \return The return value of `f` in a way appropriate to the
+/// completion token. See Boost.Asio documentation.
+template<std::invocable<> F,
+        boost::asio::completion_token_for<void()> CompletionToken,
+        typename ...IOE>
+auto async_defer(boost::asio::execution::executor auto executor, F&& f,
+                CompletionToken&& token, IOE&& ...ioe)
+  requires std::is_void_v<std::invoke_result_t<F>>
+{
+  namespace asio = boost::asio;
+  return asio::async_compose<
+    CompletionToken, void()>(
+      [executor, f = std::move(f)] (auto& self) mutable {
+       auto ex = executor;
+       asio::defer(ex, [f = std::move(f), self = std::move(self)]() mutable {
+         std::invoke(f);
+         auto ex2 = self.get_executor();
+         asio::dispatch(ex2, [self = std::move(self)]() mutable {
+           self.complete();
+         });
+       });
+      }, token, executor, std::forward<IOE>(ioe)...);
+}
+}
index 34f041e61de5d74e6ad1322b3dca086cc70762f4..1402fa24fd3ff1034a2f3327790ff8525cc7b2b7 100644 (file)
@@ -482,3 +482,8 @@ add_test(NAME unittest_decode_start_v_checker_expect_failure
          WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
 set_tests_properties(unittest_decode_start_v_checker_expect_failure
   PROPERTIES WILL_FAIL TRUE)
+
+add_executable(unittest_async_call test_async_call.cc)
+target_link_libraries(unittest_async_call librados ceph-common Boost::system
+  GTest::GTest)
+add_ceph_unittest(unittest_async_call)
diff --git a/src/test/common/test_async_call.cc b/src/test/common/test_async_call.cc
new file mode 100644 (file)
index 0000000..ceb6aa8
--- /dev/null
@@ -0,0 +1,267 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/async/async_call.h"
+
+#include <cerrno>
+#include <chrono>
+#include <functional>
+#include <exception>
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/deferred.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <boost/asio/use_awaitable.hpp>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/errc.hpp>
+#include <boost/system/system_error.hpp>
+
+#include <gtest/gtest.h>
+
+namespace asio = boost::asio;
+namespace chrono = std::chrono;
+namespace sys = boost::system;
+using namespace std::literals;
+
+using ceph::async::async_dispatch;
+using ceph::async::async_post;
+using ceph::async::async_defer;
+
+inline constexpr auto dur = 50ms;
+
+template<typename Rep, typename Period>
+asio::awaitable<chrono::duration<Rep, Period>>
+wait_for(chrono::duration<Rep, Period> dur)
+{
+  asio::steady_timer timer{co_await asio::this_coro::executor, dur};
+  co_await timer.async_wait(asio::use_awaitable);
+  co_return dur;
+}
+
+TEST(AsyncDispatch, AsyncCall)
+{
+  asio::io_context c;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      auto x = co_await async_dispatch(
+       strand, [] {return 55;}, asio::use_awaitable);
+      EXPECT_EQ(x, 55);
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+}
+
+TEST(AsyncPost, AsyncCall)
+{
+  asio::io_context c;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      auto x = co_await async_post(
+       strand, [] {return 55;}, asio::use_awaitable);
+      EXPECT_EQ(x, 55);
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+}
+
+TEST(AsyncDefer, AsyncCall)
+{
+  asio::io_context c;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      auto x = co_await async_defer(
+       strand, [] {return 55;}, asio::use_awaitable);
+      EXPECT_EQ(x, 55);
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+}
+
+TEST(AsyncDispatchVoid, AsyncCall)
+{
+  asio::io_context c;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      co_await async_dispatch(
+       strand, [] {return;}, asio::use_awaitable);
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+}
+
+TEST(AsyncPostVoid, AsyncCall)
+{
+  asio::io_context c;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      co_await async_post(
+       strand, [] {return;}, asio::use_awaitable);
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+}
+
+TEST(AsyncDeferVoid, AsyncCall)
+{
+  asio::io_context c;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      co_await async_defer(
+       strand, [] {return;}, asio::use_awaitable);
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+}
+
+TEST(AsyncDispatchDeferred, AsyncCall)
+{
+  asio::io_context c;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      bool ran = false;
+      auto op = async_dispatch(strand, [&ran] {
+       ran = true;
+       return 55;
+      }, asio::deferred);
+      EXPECT_FALSE(ran);
+      std::move(op)([](int x) {
+       EXPECT_EQ(x, 55);
+      });
+      EXPECT_TRUE(ran);
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+}
+
+TEST(AsyncDispatchDeferredVoid, AsyncCall)
+{
+  asio::io_context c;
+  bool ran = false;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      auto op = async_dispatch(strand, [&ran] {
+       ran = true;
+       return;
+      }, asio::deferred);
+      EXPECT_FALSE(ran);
+      std::move(op)([]() {});
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+  EXPECT_TRUE(ran);
+}
+
+TEST(AsyncPostDeferred, AsyncCall)
+{
+  asio::io_context c;
+  bool ran = false;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      auto op = async_post(strand, [&ran] {
+       ran = true;
+       return 55;
+      }, asio::deferred);
+      EXPECT_FALSE(ran);
+      std::move(op)([](int x) {
+       EXPECT_EQ(x, 55);
+      });
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+  EXPECT_TRUE(ran);
+}
+
+TEST(AsyncPostDeferredVoid, AsyncCall)
+{
+  asio::io_context c;
+  bool ran = false;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      auto op = async_post(strand, [&ran] {
+       ran = true;
+       return;
+      }, asio::deferred);
+      EXPECT_FALSE(ran);
+      std::move(op)([]() {});
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+  EXPECT_TRUE(ran);
+}
+
+TEST(AsyncDeferDeferred, AsyncCall)
+{
+  asio::io_context c;
+  bool ran = false;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      auto op = async_defer(strand, [&ran] {
+       ran = true;
+       return 55;
+      }, asio::deferred);
+      EXPECT_FALSE(ran);
+      std::move(op)([](int x) {
+       EXPECT_EQ(x, 55);
+      });
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+  EXPECT_TRUE(ran);
+}
+
+TEST(AsyncDeferDeferredVoid, AsyncCall)
+{
+  asio::io_context c;
+  bool ran = false;
+  asio::co_spawn(c, [&]() -> asio::awaitable<void> {
+      auto strand = asio::make_strand(c.get_executor());
+      auto op = async_defer(strand, [&ran] {
+       ran = true;
+       return;
+      }, asio::deferred);
+      EXPECT_FALSE(ran);
+      std::move(op)([]() {});
+      co_return;
+    }, [](std::exception_ptr e) {
+      if (e) std::rethrow_exception(e);
+    });
+  c.run();
+  EXPECT_TRUE(ran);
+}