]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: spawn_throttle wraps call to asio::spawn() 58798/head
authorCasey Bodley <cbodley@redhat.com>
Mon, 22 Jul 2024 20:48:29 +0000 (16:48 -0400)
committerCasey Bodley <cbodley@redhat.com>
Wed, 24 Jul 2024 19:35:05 +0000 (15:35 -0400)
cancellation of the parent must immediately cancel its children, which
only works if the children are on the same executor as the parent

prohibit child coroutines from being spawned on a different executor by
wrapping the call to asio::spawn() in a new spawn_throttle::spawn()
interface

expose an overload for asio::spawn()'s optional StackAllocator argument

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/max_concurrent_for_each.h
src/common/async/spawn_throttle.h
src/rgw/rgw_op.cc
src/test/common/test_async_spawn_throttle.cc

index c0789a1606431e41d3a4783698905483dbb7531d..04f61729ba76f0b1ea8a3633843862294c5e42ab 100644 (file)
@@ -60,10 +60,9 @@ void max_concurrent_for_each(Iterator begin,
   }
   auto throttle = spawn_throttle{y, max_concurrent, on_error};
   for (Iterator i = begin; i != end; ++i) {
-    boost::asio::spawn(throttle.get_executor(),
-                       [&func, &val = *i] (boost::asio::yield_context yield) {
-                         func(val, yield);
-                       }, throttle);
+    throttle.spawn([&func, &val = *i] (boost::asio::yield_context yield) {
+        func(val, yield);
+      });
   }
   throttle.wait();
 }
index 02e07ca3debeb99b352b89dce916acce4fbe7092..1fdff1928c7fe08d971f7cdea48d1115fc111875 100644 (file)
@@ -28,11 +28,10 @@ namespace ceph::async {
 /// The parent may either be a synchronous function or a stackful coroutine,
 /// depending on the optional_yield constructor argument.
 ///
-/// Child coroutines are spawned by calling boost::asio::spawn() and using the
-/// spawn_throttle object as the CompletionToken argument. Exceptions thrown
-/// by children are reported to the caller on its next call to get() or wait().
-/// The cancel_on_error option controls whether these exceptions trigger the
-/// cancellation of other children.
+/// Child coroutines take boost::asio::yield_context as the only argument.
+/// Exceptions thrown by children are reported to the caller on its next call
+/// to spawn() or wait(). The cancel_on_error option controls whether these
+/// exceptions trigger the cancellation of other children.
 ///
 /// All child coroutines are canceled by cancel() or spawn_throttle destruction.
 /// This allows a parent function to share memory with its child coroutines
@@ -51,7 +50,7 @@ namespace ceph::async {
 ///   auto throttle = ceph::async::spawn_throttle{y, 10};
 ///
 ///   for (size_t i = 0; i < count; i++) {
-///     boost::asio::spawn(throttle.get_executor(), child, throttle);
+///     throttle.spawn(child);
 ///   }
 ///   throttle.wait();
 /// }
@@ -86,19 +85,24 @@ class spawn_throttle {
     return impl->get_executor();
   }
 
-  /// Return a cancellable spawn() completion handler with signature
-  /// void(std::exception_ptr).
+  /// Spawn a cancellable coroutine to call the given function, passing its
+  /// boost::asio::yield_context as the only argument.
   ///
-  /// This function may block until a throttle unit becomes available. If one or
-  /// more previously-spawned coroutines exit with an exception, the first such
-  /// exception is rethrown here. 
-  ///
-  /// As a convenience, you can avoid calling this function by using the
-  /// spawn_throttle itself as a CompletionToken for spawn().
-  auto get()
-    -> detail::spawn_throttle_handler
+  /// Before spawning, this function may block until a throttle unit becomes
+  /// available. If one or more previously-spawned coroutines exit with an
+  /// exception, the first such exception is rethrown here.
+  template <typename F>
+  void spawn(F&& f)
+  {
+    boost::asio::spawn(get_executor(), std::forward<F>(f), impl->get());
+  }
+
+  /// /overload
+  template <typename StackAllocator, typename F>
+  void spawn(std::allocator_arg_t arg, StackAllocator&& alloc, F&& f)
   {
-    return impl->get();
+    boost::asio::spawn(get_executor(), arg, std::forward<StackAllocator>(alloc),
+                       std::forward<F>(f), impl->get());
   }
 
   /// Wait for all outstanding completions before returning. If any
@@ -120,27 +124,3 @@ class spawn_throttle {
 };
 
 } // namespace ceph::async
-
-namespace boost::asio {
-
-// Allow spawn_throttle to be used as a CompletionToken.
-template <typename Signature>
-struct async_result<ceph::async::spawn_throttle, Signature>
-{
-  using completion_handler_type =
-      ceph::async::detail::spawn_throttle_handler;
-  async_result(completion_handler_type&) {}
-
-  using return_type = void;
-  return_type get() {}
-
-  template <typename Initiation, typename... Args>
-  static return_type initiate(Initiation&& init,
-                              ceph::async::spawn_throttle& throttle,
-                              Args&& ...args)
-  {
-    return std::move(init)(throttle.get(), std::forward<Args>(args)...);
-  }
-};
-
-} // namespace boost::asio
index 31a74e183e63b9ba29206d1d0c051fd7e5e8501d..ace916cd7eaf62dfbd86908938622cb10871a943 100644 (file)
@@ -7156,10 +7156,9 @@ void RGWDeleteMultiObj::execute(optional_yield y)
   auto group = ceph::async::spawn_throttle{y, max_aio};
 
   for (const auto& key : multi_delete->objects) {
-    boost::asio::spawn(group.get_executor(),
-                       [this, &key] (boost::asio::yield_context yield) {
-                         handle_individual_object(key, yield);
-                       }, group);
+    group.spawn([this, &key] (boost::asio::yield_context yield) {
+                  handle_individual_object(key, yield);
+                });
 
     rgw_flush_formatter(s, s->formatter);
   }
index 6f306a13918dadccba698b53a77511b41e09399f..7e19738a8e2ce7859fe4859c6114ffe2d3924566 100644 (file)
@@ -81,7 +81,7 @@ TEST(YieldGroupSync, spawn_wait)
   };
 
   auto throttle = spawn_throttle{null_yield, 2};
-  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.spawn(cr);
   throttle.wait();
 
   EXPECT_EQ(1, completed);
@@ -90,7 +90,7 @@ TEST(YieldGroupSync, spawn_wait)
 TEST(YieldGroupSync, spawn_shutdown)
 {
   auto throttle = spawn_throttle{null_yield, 2};
-  asio::spawn(throttle.get_executor(), wait_for(1s), throttle);
+  throttle.spawn(wait_for(1s));
 }
 
 TEST(YieldGroupSync, spawn_cancel_wait)
@@ -103,7 +103,7 @@ TEST(YieldGroupSync, spawn_cancel_wait)
   };
 
   auto throttle = spawn_throttle{null_yield, 2};
-  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.spawn(cr);
   throttle.cancel();
   EXPECT_THROW(throttle.wait(), boost::system::system_error);
 
@@ -120,10 +120,10 @@ TEST(YieldGroupSync, spawn_cancel_wait_spawn_wait)
   };
 
   auto throttle = spawn_throttle{null_yield, 2};
-  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.spawn(cr);
   throttle.cancel();
   EXPECT_THROW(throttle.wait(), boost::system::system_error);
-  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.spawn(cr);
   throttle.wait();
 
   EXPECT_EQ(1, completed);
@@ -148,10 +148,10 @@ TEST(YieldGroupSync, spawn_over_limit)
   };
 
   auto throttle = spawn_throttle{null_yield, 2};
-  asio::spawn(throttle.get_executor(), cr, throttle);
-  asio::spawn(throttle.get_executor(), cr, throttle);
-  asio::spawn(throttle.get_executor(), cr, throttle); // blocks
-  asio::spawn(throttle.get_executor(), cr, throttle); // blocks
+  throttle.spawn(cr);
+  throttle.spawn(cr);
+  throttle.spawn(cr); // blocks
+  throttle.spawn(cr); // blocks
   throttle.wait(); // blocks
 
   EXPECT_EQ(0, concurrent);
@@ -173,10 +173,10 @@ TEST(YieldGroupSync, spawn_cancel_on_error_none)
   };
 
   auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::none};
-  asio::spawn(throttle.get_executor(), cr, throttle);
-  asio::spawn(throttle.get_executor(), cr, throttle);
-  asio::spawn(throttle.get_executor(), err, throttle);
-  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.spawn(cr);
+  throttle.spawn(cr);
+  throttle.spawn(err);
+  throttle.spawn(cr);
   EXPECT_THROW(throttle.wait(), std::logic_error);
 
   EXPECT_EQ(3, completed);
@@ -196,10 +196,10 @@ TEST(YieldGroupSync, spawn_cancel_on_error_after)
   };
 
   auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::after};
-  asio::spawn(throttle.get_executor(), cr, throttle);
-  asio::spawn(throttle.get_executor(), cr, throttle);
-  asio::spawn(throttle.get_executor(), err, throttle);
-  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.spawn(cr);
+  throttle.spawn(cr);
+  throttle.spawn(err);
+  throttle.spawn(cr);
   EXPECT_THROW(throttle.wait(), std::logic_error);
 
   EXPECT_EQ(2, completed);
@@ -219,10 +219,10 @@ TEST(YieldGroupSync, spawn_cancel_on_error_all)
   };
 
   auto throttle = spawn_throttle{null_yield, 4, cancel_on_error::all};
-  asio::spawn(throttle.get_executor(), cr, throttle);
-  asio::spawn(throttle.get_executor(), cr, throttle);
-  asio::spawn(throttle.get_executor(), err, throttle);
-  asio::spawn(throttle.get_executor(), cr, throttle);
+  throttle.spawn(cr);
+  throttle.spawn(cr);
+  throttle.spawn(err);
+  throttle.spawn(cr);
   EXPECT_THROW(throttle.wait(), std::logic_error);
 
   EXPECT_EQ(0, completed);
@@ -247,7 +247,7 @@ TEST(YieldGroupAsync, spawn_wait)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 2};
-      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.spawn(wait_on(waiter));
       throttle.wait(); // blocks
     }, rethrow);
 
@@ -273,10 +273,10 @@ TEST(YieldGroupAsync, spawn_over_limit)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 2};
-      asio::spawn(yield, wait_on(waiter1), throttle);
-      asio::spawn(yield, wait_on(waiter2), throttle);
-      asio::spawn(yield, wait_on(waiter3), throttle); // blocks
-      asio::spawn(yield, wait_on(waiter4), throttle); // blocks
+      throttle.spawn(wait_on(waiter1));
+      throttle.spawn(wait_on(waiter2));
+      throttle.spawn(wait_on(waiter3)); // blocks
+      throttle.spawn(wait_on(waiter4)); // blocks
       throttle.wait(); // blocks
     }, rethrow);
 
@@ -320,7 +320,7 @@ TEST(YieldGroupAsync, spawn_shutdown)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 2};
-      asio::spawn(yield, wait_on(waiter1), throttle);
+      throttle.spawn(wait_on(waiter1));
       waiter2.async_wait(yield); // blocks
       // shut down while there's an outstanding child but throttle is not
       // waiting on spawn() or wait()
@@ -340,8 +340,8 @@ TEST(YieldGroupAsync, spawn_throttled_shutdown)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 1};
-      asio::spawn(yield, wait_on(waiter1), throttle);
-      asio::spawn(yield, wait_on(waiter2), throttle); // blocks
+      throttle.spawn(wait_on(waiter1));
+      throttle.spawn(wait_on(waiter2)); // blocks
       // shut down while we're throttled on the second spawn
     }, rethrow);
 
@@ -358,7 +358,7 @@ TEST(YieldGroupAsync, spawn_wait_shutdown)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 1};
-      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.spawn(wait_on(waiter));
       throttle.wait(); // blocks
       // shut down while we're wait()ing
     }, rethrow);
@@ -378,8 +378,8 @@ TEST(YieldGroupAsync, spawn_throttled_error)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 1};
-      asio::spawn(yield, wait_on(waiter1), throttle);
-      asio::spawn(yield, wait_on(waiter2), throttle); // blocks
+      throttle.spawn(wait_on(waiter1));
+      throttle.spawn(wait_on(waiter2)); // blocks
     }, capture(result));
 
   ctx.poll();
@@ -413,8 +413,8 @@ TEST(YieldGroupAsync, spawn_throttled_signal)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 1};
-      asio::spawn(yield, wait_on(waiter1), throttle);
-      asio::spawn(yield, wait_on(waiter2), throttle); // blocks
+      throttle.spawn(wait_on(waiter1));
+      throttle.spawn(wait_on(waiter2)); // blocks
     }, capture(signal, result));
 
   ctx.poll();
@@ -446,7 +446,7 @@ TEST(YieldGroupAsync, spawn_wait_error)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 1};
-      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.spawn(wait_on(waiter));
       throttle.wait(); // blocks
     }, capture(result));
 
@@ -479,7 +479,7 @@ TEST(YieldGroupAsync, spawn_wait_signal)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 1};
-      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.spawn(wait_on(waiter));
       throttle.wait(); // blocks
     }, capture(signal, result));
 
@@ -511,7 +511,7 @@ TEST(YieldGroupAsync, spawn_cancel_wait)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 2};
-      asio::spawn(yield, wait_on(waiter), throttle);
+      throttle.spawn(wait_on(waiter));
       throttle.cancel();
       throttle.wait();
     }, capture(result));
@@ -539,9 +539,9 @@ TEST(YieldGroupAsync, spawn_cancel_on_error_none)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 4, cancel_on_error::none};
-      asio::spawn(yield, wait_on(waiter1), throttle);
-      asio::spawn(yield, wait_on(waiter2), throttle);
-      asio::spawn(yield, wait_on(waiter3), throttle);
+      throttle.spawn(wait_on(waiter1));
+      throttle.spawn(wait_on(waiter2));
+      throttle.spawn(wait_on(waiter3));
       throttle.wait(); // blocks
     }, capture(result));
 
@@ -586,9 +586,9 @@ TEST(YieldGroupAsync, spawn_cancel_on_error_after)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 4, cancel_on_error::after};
-      asio::spawn(yield, wait_on(waiter1), throttle);
-      asio::spawn(yield, wait_on(waiter2), throttle);
-      asio::spawn(yield, wait_on(waiter3), throttle);
+      throttle.spawn(wait_on(waiter1));
+      throttle.spawn(wait_on(waiter2));
+      throttle.spawn(wait_on(waiter3));
       throttle.wait(); // blocks
     }, capture(result));
 
@@ -629,9 +629,9 @@ TEST(YieldGroupAsync, spawn_cancel_on_error_all)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 4, cancel_on_error::all};
-      asio::spawn(yield, wait_on(waiter1), throttle);
-      asio::spawn(yield, wait_on(waiter2), throttle);
-      asio::spawn(yield, wait_on(waiter3), throttle);
+      throttle.spawn(wait_on(waiter1));
+      throttle.spawn(wait_on(waiter2));
+      throttle.spawn(wait_on(waiter3));
       throttle.wait(); // blocks
     }, capture(result));
 
@@ -665,9 +665,9 @@ TEST(YieldGroupAsync, spawn_wait_spawn_wait)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 1};
-      asio::spawn(yield, wait_on(waiter1), throttle);
+      throttle.spawn(wait_on(waiter1));
       throttle.wait(); // blocks
-      asio::spawn(yield, wait_on(waiter2), throttle);
+      throttle.spawn(wait_on(waiter2));
       throttle.wait(); // blocks
     }, rethrow);
 
@@ -698,10 +698,10 @@ TEST(YieldGroupAsync, spawn_cancel_wait_spawn_wait)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 1};
-      asio::spawn(yield, wait_on(waiter1), throttle);
+      throttle.spawn(wait_on(waiter1));
       throttle.cancel();
       EXPECT_THROW(throttle.wait(), boost::system::system_error);
-      asio::spawn(yield, wait_on(waiter2), throttle);
+      throttle.spawn(wait_on(waiter2));
       throttle.wait(); // blocks
     }, rethrow);
 
@@ -723,9 +723,9 @@ TEST(YieldGroupAsync, spawn_error_wait_spawn_wait)
 
   asio::spawn(ctx, [&] (asio::yield_context yield) {
       auto throttle = spawn_throttle{yield, 1};
-      asio::spawn(yield, wait_on(waiter1), throttle);
+      throttle.spawn(wait_on(waiter1));
       EXPECT_THROW(throttle.wait(), boost::system::system_error);
-      asio::spawn(yield, wait_on(waiter2), throttle);
+      throttle.spawn(wait_on(waiter2));
       throttle.wait(); // blocks
     }, rethrow);