]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: add parallel_for_each() algorithm 58348/head
authorCasey Bodley <cbodley@redhat.com>
Sun, 5 Feb 2023 22:08:49 +0000 (17:08 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 24 Jul 2024 16:51:46 +0000 (12:51 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/parallel_for_each.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_parallel_for_each.cc [new file with mode: 0644]

diff --git a/src/common/async/parallel_for_each.h b/src/common/async/parallel_for_each.h
new file mode 100644 (file)
index 0000000..cb49703
--- /dev/null
@@ -0,0 +1,86 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <concepts>
+#include <iterator>
+#include <ranges>
+#include <type_traits>
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/execution/executor.hpp>
+#include <boost/asio/this_coro.hpp>
+#include "co_spawn_group.h"
+
+namespace ceph::async {
+
+/// Call a coroutine with each element in the given range then wait for all
+/// of them to complete. The first exception is rethrown to the caller. The
+/// cancel_on_error option controls whether these exceptions trigger the
+/// cancellation of other children.
+///
+/// Example:
+/// \code
+/// awaitable<void> child(task& t);
+///
+/// awaitable<void> parent(std::span<task> tasks)
+/// {
+///   co_await parallel_for_each(tasks.begin(), tasks.end(), child);
+/// }
+/// \endcode
+template <typename Iterator, typename Sentinel, typename VoidAwaitableFactory,
+          typename Value = std::iter_reference_t<Iterator>,
+          typename VoidAwaitable = std::invoke_result_t<
+              VoidAwaitableFactory, Value>,
+          typename AwaitableT = typename VoidAwaitable::value_type,
+          typename AwaitableExecutor = typename VoidAwaitable::executor_type>
+    requires (std::input_iterator<Iterator> &&
+              std::sentinel_for<Sentinel, Iterator> &&
+              std::same_as<AwaitableT, void> &&
+              boost::asio::execution::executor<AwaitableExecutor>)
+auto parallel_for_each(Iterator begin, Sentinel end,
+                       VoidAwaitableFactory&& factory,
+                       cancel_on_error on_error = cancel_on_error::none)
+    -> boost::asio::awaitable<void, AwaitableExecutor>
+{
+  const size_t count = std::ranges::distance(begin, end);
+  if (!count) {
+    co_return;
+  }
+  auto ex = co_await boost::asio::this_coro::executor;
+  auto group = co_spawn_group{ex, count, on_error};
+  for (Iterator i = begin; i != end; ++i) {
+    group.spawn(factory(*i));
+  }
+  co_await group.wait();
+}
+
+/// \overload
+template <typename Range, typename VoidAwaitableFactory,
+          typename Value = std::ranges::range_reference_t<Range>,
+          typename VoidAwaitable = std::invoke_result_t<
+              VoidAwaitableFactory, Value>,
+          typename AwaitableT = typename VoidAwaitable::value_type,
+          typename AwaitableExecutor = typename VoidAwaitable::executor_type>
+    requires (std::ranges::range<Range> &&
+              std::same_as<AwaitableT, void> &&
+              boost::asio::execution::executor<AwaitableExecutor>)
+auto parallel_for_each(Range&& range, VoidAwaitableFactory&& factory,
+                       cancel_on_error on_error = cancel_on_error::none)
+    -> boost::asio::awaitable<void, AwaitableExecutor>
+{
+  return parallel_for_each(std::begin(range), std::end(range),
+                           std::move(factory), on_error);
+}
+
+} // namespace ceph::async
index 96bbb35f73bc9b01fa8655e17a172fa235092694..33ff38b932df4249d0952ba66502d3f6adbf1b45 100644 (file)
@@ -388,6 +388,10 @@ add_executable(unittest_async_yield_waiter test_async_yield_waiter.cc)
 add_ceph_unittest(unittest_async_yield_waiter)
 target_link_libraries(unittest_async_yield_waiter ceph-common Boost::system Boost::context)
 
+add_executable(unittest_async_parallel_for_each test_async_parallel_for_each.cc)
+add_ceph_unittest(unittest_async_parallel_for_each)
+target_link_libraries(unittest_async_parallel_for_each ceph-common Boost::system)
+
 add_executable(unittest_cdc test_cdc.cc
   $<TARGET_OBJECTS:unit-main>)
 target_link_libraries(unittest_cdc global ceph-common)
diff --git a/src/test/common/test_async_parallel_for_each.cc b/src/test/common/test_async_parallel_for_each.cc
new file mode 100644 (file)
index 0000000..be04221
--- /dev/null
@@ -0,0 +1,258 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/async/parallel_for_each.h"
+
+#include <optional>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/io_context.hpp>
+#include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+namespace errc = boost::system::errc;
+using boost::system::error_code;
+
+using executor_type = asio::io_context::executor_type;
+
+template <typename T>
+using awaitable = asio::awaitable<T, executor_type>;
+
+using void_waiter = co_waiter<void, executor_type>;
+
+template <typename T>
+auto capture(std::optional<T>& opt)
+{
+  return [&opt] (T value) { opt = std::move(value); };
+}
+
+template <typename T>
+auto capture(asio::cancellation_signal& signal, std::optional<T>& opt)
+{
+  return asio::bind_cancellation_slot(signal.slot(), capture(opt));
+}
+
+TEST(parallel_for_each, empty)
+{
+  asio::io_context ctx;
+
+  int* end = nullptr;
+  auto cr = [] (int i) -> awaitable<void> { co_return; };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, parallel_for_each(end, end, cr), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(parallel_for_each, shutdown)
+{
+  asio::io_context ctx;
+
+  void_waiter waiters[2];
+  auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  // shut down before any waiters complete
+}
+
+TEST(parallel_for_each, cancel)
+{
+  asio::io_context ctx;
+
+  void_waiter waiters[2];
+  auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  // cancel before any waiters complete
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(parallel_for_each, complete_shutdown)
+{
+  asio::io_context ctx;
+
+  void_waiter waiters[2];
+  auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiters[0].complete(nullptr);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  // shut down before final waiter completes
+}
+
+TEST(parallel_for_each, complete_cancel)
+{
+  asio::io_context ctx;
+
+  void_waiter waiters[2];
+  auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+  asio::cancellation_signal signal;
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(signal, result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiters[0].complete(nullptr);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  // cancel before final waiter completes
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(parallel_for_each, complete_complete)
+{
+  asio::io_context ctx;
+
+  void_waiter waiters[2];
+  auto cr = [] (void_waiter& w) -> awaitable<void> { return w.get(); };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, parallel_for_each(waiters, cr), capture(result));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiters[0].complete(nullptr);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  waiters[1].complete(nullptr);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+struct null_sentinel {};
+bool operator==(const char* c, null_sentinel) { return !*c; }
+static_assert(std::sentinel_for<null_sentinel, const char*>);
+
+TEST(parallel_for_each, sentinel)
+{
+  asio::io_context ctx;
+
+  const char* begin = "hello";
+  null_sentinel end;
+
+  size_t count = 0;
+  auto cr = [&count] (char c) -> awaitable<void> {
+    ++count;
+    co_return;
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, parallel_for_each(begin, end, cr), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+  EXPECT_EQ(count, 5);
+}
+
+TEST(parallel_for_each, move_iterator)
+{
+  asio::io_context ctx;
+
+  using value_type = std::unique_ptr<int>;
+  value_type values[] = {
+    std::make_unique<int>(42),
+    std::make_unique<int>(43),
+  };
+
+  auto begin = std::make_move_iterator(std::begin(values));
+  auto end = std::make_move_iterator(std::end(values));
+
+  auto cr = [] (value_type v) -> awaitable<void> {
+    if (!v) {
+      throw std::invalid_argument("empty");
+    }
+    co_return;
+  };
+
+  std::optional<std::exception_ptr> result;
+  asio::co_spawn(ctx, parallel_for_each(begin, end, cr), capture(result));
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+
+  EXPECT_FALSE(values[0]);
+  EXPECT_FALSE(values[1]);
+}
+
+} // namespace ceph::async