]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: add max_concurrent_for_each() algorithm 57188/head
authorCasey Bodley <cbodley@redhat.com>
Thu, 9 May 2024 17:00:57 +0000 (13:00 -0400)
committerCasey Bodley <cbodley@redhat.com>
Thu, 23 May 2024 13:06:06 +0000 (09:06 -0400)
inspired by seastar's max_concurrent_for_each(), implemented for
optional_yield in terms of yield_throttle

can also be overloaded for co_await() and co_throttle (not part of this
branch)

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/max_concurrent_for_each.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_max_concurrent_for_each.cc [new file with mode: 0644]

diff --git a/src/common/async/max_concurrent_for_each.h b/src/common/async/max_concurrent_for_each.h
new file mode 100644 (file)
index 0000000..c0789a1
--- /dev/null
@@ -0,0 +1,89 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <concepts>
+#include <iterator>
+#include <ranges>
+#include <utility>
+#include <boost/asio/spawn.hpp>
+#include "cancel_on_error.h"
+#include "yield_context.h"
+#include "spawn_throttle.h"
+
+namespace ceph::async {
+
+/// Call a coroutine with each element in the given range then wait for all of
+/// them to complete. The first exception is rethrown to the caller. The
+/// cancel_on_error option controls whether these exceptions trigger the
+/// cancellation of other children. The number of outstanding coroutines
+/// is limited by the max_concurrent argument.
+///
+/// Example:
+/// \code
+/// void child(task& t, boost::asio::yield_context yield);
+///
+/// void parent(std::span<task> tasks, optional_yield y)
+/// {
+///   // process all tasks, up to 10 at a time
+///   max_concurrent_for_each(tasks, 10, y, child);
+/// }
+/// \endcode
+template <typename Iterator, typename Sentinel, typename Func,
+          typename Reference = std::iter_reference_t<Iterator>>
+    requires (std::input_iterator<Iterator> &&
+              std::sentinel_for<Sentinel, Iterator> &&
+              std::invocable<Func, Reference, boost::asio::yield_context>)
+void max_concurrent_for_each(Iterator begin,
+                             Sentinel end,
+                             size_t max_concurrent,
+                             optional_yield y,
+                             Func&& func,
+                             cancel_on_error on_error = cancel_on_error::none)
+{
+  const size_t count = std::ranges::distance(begin, end);
+  if (!count) {
+    return;
+  }
+  auto throttle = spawn_throttle{y, max_concurrent, on_error};
+  for (Iterator i = begin; i != end; ++i) {
+    boost::asio::spawn(throttle.get_executor(),
+                       [&func, &val = *i] (boost::asio::yield_context yield) {
+                         func(val, yield);
+                       }, throttle);
+  }
+  throttle.wait();
+}
+
+/// \overload
+template <typename Range, typename Func,
+          typename Reference = std::ranges::range_reference_t<Range>>
+    requires (std::ranges::range<Range> &&
+              std::invocable<Func, Reference, boost::asio::yield_context>)
+auto max_concurrent_for_each(Range&& range,
+                             size_t max_concurrent,
+                             optional_yield y,
+                             Func&& func,
+                             cancel_on_error on_error = cancel_on_error::none)
+{
+  return max_concurrent_for_each(std::begin(range), std::end(range),
+                                 max_concurrent, y, std::forward<Func>(func),
+                                 on_error);
+}
+
+// TODO: overloads for co_spawn()
+
+} // namespace ceph::async
index d54ea127cc9687e9746d62b33a361beb61fd35ee..428ef7b01470f42328555baf9f365a491ae23824 100644 (file)
@@ -357,6 +357,10 @@ add_executable(unittest_async_completion test_async_completion.cc)
 add_ceph_unittest(unittest_async_completion)
 target_link_libraries(unittest_async_completion ceph-common Boost::system)
 
+add_executable(unittest_async_max_concurrent_for_each test_async_max_concurrent_for_each.cc)
+add_ceph_unittest(unittest_async_max_concurrent_for_each)
+target_link_libraries(unittest_async_max_concurrent_for_each ceph-common Boost::system Boost::context)
+
 add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc)
 add_ceph_unittest(unittest_async_shared_mutex)
 target_link_libraries(unittest_async_shared_mutex ceph-common Boost::system)
diff --git a/src/test/common/test_async_max_concurrent_for_each.cc b/src/test/common/test_async_max_concurrent_for_each.cc
new file mode 100644 (file)
index 0000000..2e6f919
--- /dev/null
@@ -0,0 +1,225 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/max_concurrent_for_each.h"
+
+#include <chrono>
+#include <exception>
+#include <optional>
+#include <boost/asio/spawn.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <gtest/gtest.h>
+
+namespace ceph::async {
+
+namespace asio = boost::asio;
+
+void rethrow(std::exception_ptr eptr)
+{
+  if (eptr) std::rethrow_exception(eptr);
+}
+
+using namespace std::chrono_literals;
+
+void wait_for(std::chrono::milliseconds dur, asio::yield_context yield)
+{
+  auto timer = asio::steady_timer{yield.get_executor(), dur};
+  timer.async_wait(yield);
+}
+
+struct null_sentinel {};
+bool operator==(const char* c, null_sentinel) { return !*c; }
+static_assert(std::sentinel_for<null_sentinel, const char*>);
+
+TEST(iterator_null_yield, empty)
+{
+  int* end = nullptr;
+  auto cr = [] (int, asio::yield_context) {};
+  max_concurrent_for_each(end, end, 10, null_yield, cr);
+}
+
+TEST(iterator_null_yield, over_limit)
+{
+  int concurrent = 0;
+  int max_concurrent = 0;
+  int completed = 0;
+
+  auto cr = [&] (int, asio::yield_context yield) {
+    ++concurrent;
+    if (max_concurrent < concurrent) {
+      max_concurrent = concurrent;
+    }
+
+    wait_for(1ms, yield);
+
+    --concurrent;
+    ++completed;
+  };
+
+  constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
+  max_concurrent_for_each(begin(arr), end(arr), 2, null_yield, cr);
+
+  EXPECT_EQ(0, concurrent);
+  EXPECT_EQ(2, max_concurrent);
+  EXPECT_EQ(10, completed);
+}
+
+TEST(iterator_null_yield, sentinel)
+{
+  const char* begin = "hello";
+  null_sentinel end;
+
+  size_t completed = 0;
+  auto cr = [&completed] (char c, asio::yield_context) { ++completed; };
+  max_concurrent_for_each(begin, end, 10, null_yield, cr);
+  EXPECT_EQ(completed, 5);
+}
+
+TEST(range_null_yield, empty)
+{
+  constexpr std::array<int, 0> arr{};
+  auto cr = [] (int, asio::yield_context) {};
+  max_concurrent_for_each(arr, 10, null_yield, cr);
+}
+
+TEST(range_null_yield, over_limit)
+{
+  int concurrent = 0;
+  int max_concurrent = 0;
+  int completed = 0;
+
+  auto cr = [&] (int, asio::yield_context yield) {
+    ++concurrent;
+    if (max_concurrent < concurrent) {
+      max_concurrent = concurrent;
+    }
+
+    wait_for(1ms, yield);
+
+    --concurrent;
+    ++completed;
+  };
+
+  constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
+  max_concurrent_for_each(arr, 2, null_yield, cr);
+
+  EXPECT_EQ(0, concurrent);
+  EXPECT_EQ(2, max_concurrent);
+  EXPECT_EQ(10, completed);
+}
+
+
+TEST(iterator_yield, empty)
+{
+  int* end = nullptr;
+  auto cr = [] (int, asio::yield_context) {};
+
+  asio::io_context ctx;
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+        max_concurrent_for_each(end, end, 10, yield, cr);
+      }, rethrow);
+  ctx.run();
+}
+
+TEST(iterator_yield, over_limit)
+{
+  int concurrent = 0;
+  int max_concurrent = 0;
+  int completed = 0;
+
+  auto cr = [&] (int, asio::yield_context yield) {
+    ++concurrent;
+    if (max_concurrent < concurrent) {
+      max_concurrent = concurrent;
+    }
+
+    wait_for(1ms, yield);
+
+    --concurrent;
+    ++completed;
+  };
+
+  asio::io_context ctx;
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+        constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
+        max_concurrent_for_each(begin(arr), end(arr), 2, yield, cr);
+      }, rethrow);
+  ctx.run();
+
+  EXPECT_EQ(0, concurrent);
+  EXPECT_EQ(2, max_concurrent);
+  EXPECT_EQ(10, completed);
+}
+
+TEST(iterator_yield, sentinel)
+{
+  const char* begin = "hello";
+  null_sentinel end;
+
+  size_t completed = 0;
+  auto cr = [&completed] (char c, asio::yield_context) { ++completed; };
+
+  asio::io_context ctx;
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+        max_concurrent_for_each(begin, end, 10, yield, cr);
+      }, rethrow);
+  ctx.run();
+
+  EXPECT_EQ(completed, 5);
+}
+
+TEST(range_yield, empty)
+{
+  constexpr std::array<int, 0> arr{};
+  auto cr = [] (int, asio::yield_context) {};
+
+  asio::io_context ctx;
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+  max_concurrent_for_each(arr, 10, yield, cr);
+      }, rethrow);
+  ctx.run();
+}
+
+TEST(range_yield, over_limit)
+{
+  int concurrent = 0;
+  int max_concurrent = 0;
+  int completed = 0;
+
+  auto cr = [&] (int, asio::yield_context yield) {
+    ++concurrent;
+    if (max_concurrent < concurrent) {
+      max_concurrent = concurrent;
+    }
+
+    wait_for(1ms, yield);
+
+    --concurrent;
+    ++completed;
+  };
+
+  asio::io_context ctx;
+  asio::spawn(ctx, [&] (asio::yield_context yield) {
+        constexpr auto arr = std::array{1,2,3,4,5,6,7,8,9,10};
+        max_concurrent_for_each(arr, 2, yield, cr);
+      }, rethrow);
+  ctx.run();
+
+  EXPECT_EQ(0, concurrent);
+  EXPECT_EQ(2, max_concurrent);
+  EXPECT_EQ(10, completed);
+}
+
+} // namespace ceph::async