]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: co_throttle_impl uses co_waiter
authorCasey Bodley <cbodley@redhat.com>
Mon, 30 Jan 2023 01:31:21 +0000 (20:31 -0500)
committerAdam Emerson <aemerson@redhat.com>
Thu, 14 Sep 2023 21:48:00 +0000 (17:48 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/detail/co_throttle_impl.h

index 43e4fcdbcaec647af24bea9c91991f2be8e9399c..d1a89db94c0644c4f22748a2902017b108919d8f 100644 (file)
@@ -23,6 +23,7 @@
 #include <boost/intrusive/list.hpp>
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 #include "common/async/cancel_on_error.h"
+#include "common/async/co_waiter.h"
 #include "common/async/service.h"
 #include "include/ceph_assert.h"
 
@@ -127,14 +128,16 @@ class co_throttle_impl :
       c.canceled = true;
       c.signal->emit(boost::asio::cancellation_type::terminal);
     }
-    if (wait_handler) {
-      wait_complete(make_error_code(boost::asio::error::operation_aborted));
+    if (waiter.waiting()) {
+      auto eptr = std::exchange(unreported_exception, nullptr);
+      auto ec = make_error_code(boost::asio::error::operation_aborted);
+      waiter.complete(eptr, ec);
     }
   }
 
   void service_shutdown()
   {
-    wait_handler.reset();
+    waiter.shutdown();
   }
 
  private:
@@ -162,25 +165,14 @@ class co_throttle_impl :
   child_list outstanding;
   child_list free;
 
-  using use_awaitable_t = boost::asio::use_awaitable_t<executor_type>;
-
-  using wait_signature = void(std::exception_ptr, boost::system::error_code);
-  using wait_handler_type = typename boost::asio::async_result<
-      use_awaitable_t, wait_signature>::handler_type;
-  std::optional<wait_handler_type> wait_handler;
+  co_waiter<boost::system::error_code, executor_type> waiter;
 
   // return an awaitable that completes once count <= target_count
   auto wait_for(size_type target_count)
       -> awaitable<boost::system::error_code>
   {
-    ceph_assert(!wait_handler); // one waiter at a time
     wait_for_count = target_count;
-
-    use_awaitable_t token;
-    return boost::asio::async_initiate<use_awaitable_t, wait_signature>(
-        [this] (wait_handler_type h) {
-          wait_handler.emplace(std::move(h));
-        }, token);
+    return waiter.get();
   }
 
   void on_complete(child& c, std::exception_ptr eptr,
@@ -222,21 +214,12 @@ class co_throttle_impl :
     }
 
     // maybe wake the waiter
-    if (wait_handler && count <= wait_for_count) {
-      wait_complete({});
+    if (waiter.waiting() && count <= wait_for_count) {
+      auto eptr = std::exchange(unreported_exception, nullptr);
+      waiter.complete(eptr, {});
     }
   }
 
-  void wait_complete(boost::system::error_code ec)
-  {
-    // bind arguments to the handler for dispatch
-    auto eptr = std::exchange(unreported_exception, nullptr);
-    auto c = boost::asio::append(std::move(*wait_handler), eptr, ec);
-    wait_handler.reset();
-
-    boost::asio::dispatch(std::move(c));
-  }
-
   struct child_completion {
     boost::intrusive_ptr<co_throttle_impl> impl;
     child& c;