]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: fold async_spawn_throttle_impl into spawn_throttle_impl 64624/head
authorCasey Bodley <cbodley@redhat.com>
Tue, 22 Jul 2025 19:01:12 +0000 (15:01 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 28 Jul 2025 15:56:24 +0000 (11:56 -0400)
spawn_throttle_impl was a virtual base class for the sync_ and async_
variants. with the sync variant removed, combine the classes into one
and remove the use of virtual functions

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/detail/spawn_throttle_impl.h
src/common/async/spawn_throttle.h

index c47aa2a518c6671d5fe6172db64b4dcf79f58e6e..4284d81c7246a1186f10a1e2994eb7720155275d 100644 (file)
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 #include "common/async/cancel_on_error.h"
 #include "common/async/service.h"
-#include "common/async/yield_context.h"
 
 namespace ceph::async::detail {
 
 struct spawn_throttle_handler;
 
-// Reference-counted spawn throttle interface.
+// Reference-counted spawn throttle implementation.
 class spawn_throttle_impl :
     public boost::intrusive_ref_counter<spawn_throttle_impl,
-        boost::thread_unsafe_counter>
+        boost::thread_unsafe_counter>,
+    public service_list_base_hook
 {
  public:
-  spawn_throttle_impl(size_t limit, cancel_on_error on_error)
-    : limit(limit), on_error(on_error),
+  spawn_throttle_impl(boost::asio::yield_context yield,
+                      size_t limit, cancel_on_error on_error)
+    : svc(boost::asio::use_service<service<spawn_throttle_impl>>(
+              boost::asio::query(yield.get_executor(),
+                                 boost::asio::execution::context))),
+      yield(yield), limit(limit), on_error(on_error),
       children(std::make_unique<child[]>(limit))
   {
+    // register for service_shutdown() notifications
+    svc.add(*this);
+
     // initialize the free list
     for (size_t i = 0; i < limit; i++) {
       free.push_back(children[i]);
     }
   }
-  virtual ~spawn_throttle_impl() {}
+
+  ~spawn_throttle_impl()
+  {
+    svc.remove(*this);
+  }
 
   // return the completion handler for a new child. may block due to throttling
   // or rethrow an exception from a previously-spawned child
@@ -63,19 +74,42 @@ class spawn_throttle_impl :
   };
 
   using executor_type = boost::asio::any_io_executor;
-  virtual executor_type get_executor() = 0;
+  executor_type get_executor()
+  {
+    return yield.get_executor();
+  }
 
   // wait until count <= target_count
-  virtual void wait_for(size_t target_count) = 0;
+  void wait_for(size_t target_count)
+  {
+    if (count > target_count) {
+      wait_for_count = target_count;
+
+      boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
+          [this] (auto handler) {
+            auto slot = get_associated_cancellation_slot(handler);
+            if (slot.is_connected()) {
+              slot.template emplace<op_cancellation>(this);
+            }
+            waiter.emplace(std::move(handler));
+          }, yield);
+      // this is a coroutine, so the wait has completed by this point
+    }
+
+    report_exception(); // throw unreported exception
+  }
 
   // cancel outstanding coroutines
-  virtual void cancel(bool shutdown)
+  void cancel()
   {
     cancel_outstanding_from(outstanding.begin());
+    if (waiter) {
+      wait_complete(make_error_code(boost::asio::error::operation_aborted));
+    }
   }
 
   // complete the given child coroutine
-  virtual void on_complete(child& c, std::exception_ptr eptr)
+  void on_complete(child& c, std::exception_ptr eptr)
   {
     --count;
 
@@ -98,9 +132,21 @@ class spawn_throttle_impl :
       }
       cancel_outstanding_from(cancel_from);
     }
+
+    if (waiter && count <= wait_for_count) {
+      wait_complete({});
+    }
+  }
+
+
+  void service_shutdown()
+  {
+    waiter.reset();
   }
 
- protected:
+ private:
+  service<spawn_throttle_impl>& svc;
+  boost::asio::yield_context yield;
   const size_t limit;
   const cancel_on_error on_error;
   size_t count = 0;
@@ -112,7 +158,6 @@ class spawn_throttle_impl :
     }
   }
 
- private:
   std::exception_ptr unreported_exception;
   std::unique_ptr<child[]> children;
 
@@ -130,6 +175,42 @@ class spawn_throttle_impl :
       c.signal->emit(boost::asio::cancellation_type::terminal);
     }
   }
+
+  using WaitSignature = void(boost::system::error_code);
+  struct wait_state {
+    using Work = boost::asio::executor_work_guard<
+        boost::asio::any_io_executor>;
+    using Handler = typename boost::asio::async_result<
+        boost::asio::yield_context, WaitSignature>::handler_type;
+
+    Work work;
+    Handler handler;
+
+    explicit wait_state(Handler&& h)
+      : work(make_work_guard(h)),
+        handler(std::move(h))
+    {}
+  };
+  std::optional<wait_state> waiter;
+  size_t wait_for_count = 0;
+
+  struct op_cancellation {
+    spawn_throttle_impl* self;
+    explicit op_cancellation(spawn_throttle_impl* self) noexcept
+      : self(self) {}
+    void operator()(boost::asio::cancellation_type type) {
+      if (type != boost::asio::cancellation_type::none) {
+        self->cancel();
+      }
+    }
+  };
+
+  void wait_complete(boost::system::error_code ec)
+  {
+    auto w = std::move(*waiter);
+    waiter.reset();
+    boost::asio::dispatch(boost::asio::append(std::move(w.handler), ec));
+  }
 };
 
 // A cancellable spawn() completion handler that notifies the spawn_throttle
@@ -184,114 +265,4 @@ inline spawn_throttle_handler spawn_throttle_impl::get()
   return {this, c};
 }
 
-class async_spawn_throttle_impl final :
-    public spawn_throttle_impl,
-    public service_list_base_hook
-{
- public:
-  async_spawn_throttle_impl(boost::asio::yield_context yield,
-                            size_t limit, cancel_on_error on_error)
-    : spawn_throttle_impl(limit, on_error),
-      svc(boost::asio::use_service<service<async_spawn_throttle_impl>>(
-              boost::asio::query(yield.get_executor(),
-                                 boost::asio::execution::context))),
-      yield(yield)
-  {
-    // register for service_shutdown() notifications
-    svc.add(*this);
-  }
-
-  ~async_spawn_throttle_impl()
-  {
-    svc.remove(*this);
-  }
-
-  executor_type get_executor() override
-  {
-    return yield.get_executor();
-  }
-
-  void service_shutdown()
-  {
-    waiter.reset();
-  }
-
- private:
-  service<async_spawn_throttle_impl>& svc;
-  boost::asio::yield_context yield;
-
-  using WaitSignature = void(boost::system::error_code);
-  struct wait_state {
-    using Work = boost::asio::executor_work_guard<
-        boost::asio::any_io_executor>;
-    using Handler = typename boost::asio::async_result<
-        boost::asio::yield_context, WaitSignature>::handler_type;
-
-    Work work;
-    Handler handler;
-
-    explicit wait_state(Handler&& h)
-      : work(make_work_guard(h)),
-        handler(std::move(h))
-    {}
-  };
-  std::optional<wait_state> waiter;
-  size_t wait_for_count = 0;
-
-  struct op_cancellation {
-    async_spawn_throttle_impl* self;
-    explicit op_cancellation(async_spawn_throttle_impl* self) noexcept
-      : self(self) {}
-    void operator()(boost::asio::cancellation_type type) {
-      if (type != boost::asio::cancellation_type::none) {
-        self->cancel(false);
-      }
-    }
-  };
-
-  void wait_for(size_t target_count) override
-  {
-    if (count > target_count) {
-      wait_for_count = target_count;
-
-      boost::asio::async_initiate<boost::asio::yield_context, WaitSignature>(
-          [this] (auto handler) {
-            auto slot = get_associated_cancellation_slot(handler);
-            if (slot.is_connected()) {
-              slot.template emplace<op_cancellation>(this);
-            }
-            waiter.emplace(std::move(handler));
-          }, yield);
-      // this is a coroutine, so the wait has completed by this point
-    }
-
-    report_exception(); // throw unreported exception
-  }
-
-  void wait_complete(boost::system::error_code ec)
-  {
-    auto w = std::move(*waiter);
-    waiter.reset();
-    boost::asio::dispatch(boost::asio::append(std::move(w.handler), ec));
-  }
-
-  void on_complete(child& c, std::exception_ptr eptr) override
-  {
-    spawn_throttle_impl::on_complete(c, eptr);
-
-    if (waiter && count <= wait_for_count) {
-      wait_complete({});
-    }
-  }
-
-  void cancel(bool shutdown) override
-  {
-    spawn_throttle_impl::cancel(shutdown);
-
-    if (waiter) {
-      wait_complete(make_error_code(boost::asio::error::operation_aborted));
-    }
-  }
-};
-
 } // namespace ceph::async::detail
index 41aae346b1bb688e4510e96d6cccae4b0a6c5d78..51826ea0e9f06c55c2f16451c083041ae2e26a36 100644 (file)
@@ -19,7 +19,6 @@
 
 #include <boost/intrusive_ptr.hpp>
 #include "cancel_on_error.h"
-#include "yield_context.h"
 
 namespace ceph::async {
 
@@ -60,7 +59,7 @@ class spawn_throttle {
  public:
   spawn_throttle(boost::asio::yield_context yield, size_t limit,
                  cancel_on_error on_error = cancel_on_error::none)
-    : impl(new detail::async_spawn_throttle_impl(yield, limit, on_error))
+    : impl(new detail::spawn_throttle_impl(yield, limit, on_error))
   {}
 
   spawn_throttle(spawn_throttle&&) = default;
@@ -73,7 +72,7 @@ class spawn_throttle {
   ~spawn_throttle()
   {
     if (impl) {
-      impl->cancel(true);
+      impl->cancel();
     }
   }
 
@@ -117,7 +116,7 @@ class spawn_throttle {
   /// Cancel all outstanding coroutines.
   void cancel()
   {
-    impl->cancel(false);
+    impl->cancel();
   }
 };