]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: yield_waiter overloads for unique_lock
authorCasey Bodley <cbodley@redhat.com>
Mon, 24 Mar 2025 16:50:16 +0000 (12:50 -0400)
committerMarcel Lauhoff <marcel.lauhoff@clyso.com>
Mon, 1 Jun 2026 16:43:28 +0000 (18:43 +0200)
if async_wait() can race with complete() across threads, the
yield_waiter's handler_state needs to be protected by a mutex. add
an async_wait() overload for unique_lock that behaves like
condition_variable::wait(): the lock is released immediately before
suspending, and reacquired immediately before calling its completion
handler

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/yield_waiter.h
src/test/common/test_async_yield_waiter.cc

index 9206ac1f505b790e7d3d079d43b5da7f399cdf45..6ace1f2efc467692aae3a3455d57a695f8cca5d0 100644 (file)
@@ -16,6 +16,7 @@
 #pragma once
 
 #include <exception>
+#include <mutex>
 #include <optional>
 #include <boost/asio/append.hpp>
 #include <boost/asio/associated_cancellation_slot.hpp>
 
 namespace ceph::async {
 
+namespace detail {
+
+// handler wrapper that reacquires a lock immediately before completion
+template <typename Handler, typename BasicLockable>
+struct lock_handler {
+  Handler handler;
+  BasicLockable* lock = nullptr;
+
+  template <typename ...Args>
+  void operator()(Args&& ...args) {
+    if (lock) {
+      lock->lock();
+    }
+    std::move(handler)(std::forward<Args>(args)...);
+  }
+};
+
+// deduction guide required by windows?
+template <typename Handler, typename BasicLockable>
+lock_handler(Handler&&, BasicLockable*) -> lock_handler<Handler, BasicLockable>;
+
+} // namespace detail
+
 /// Captures a yield_context handler for deferred completion or cancellation.
 template <typename Ret>
 class yield_waiter {
@@ -52,7 +76,25 @@ class yield_waiter {
           if (slot.is_connected()) {
             slot.template emplace<op_cancellation>(this);
           }
-          state.emplace(std::move(h));
+          constexpr std::unique_lock<std::mutex>* lock = nullptr;
+          state.emplace(std::move(h), lock);
+        }, token);
+  }
+
+  /// Suspends the given yield_context until the captured handler is invoked
+  /// via complete() or cancel(). The given lock is released immediately before
+  /// it suspends and reacquired immediately after it resumes.
+  template <typename CompletionToken>
+  auto async_wait(std::unique_lock<std::mutex>& lock, CompletionToken&& token)
+  {
+    return boost::asio::async_initiate<CompletionToken, Signature>(
+        [this, &lock] (handler_type h) {
+          auto slot = get_associated_cancellation_slot(h);
+          if (slot.is_connected()) {
+            slot.template emplace<op_cancellation>(this);
+          }
+          state.emplace(std::move(h), &lock);
+          lock.unlock(); // unlock before suspend
         }, token);
   }
 
@@ -61,8 +103,10 @@ class yield_waiter {
   {
     auto s = std::move(*state);
     state.reset();
-    auto h = boost::asio::append(std::move(s.handler), ec, std::move(value));
-    boost::asio::dispatch(std::move(h));
+    boost::asio::dispatch(
+        boost::asio::append(
+            detail::lock_handler{std::move(s.handler), s.lock},
+            ec, std::move(value)));
   }
 
   /// Destroy the completion handler.
@@ -80,10 +124,10 @@ class yield_waiter {
   struct handler_state {
     handler_type handler;
     work_guard work;
+    std::unique_lock<std::mutex>* lock = nullptr;
 
-    explicit handler_state(handler_type&& h)
-      : handler(std::move(h)),
-        work(make_work_guard(handler))
+    handler_state(handler_type&& h, std::unique_lock<std::mutex>* lock)
+      : handler(std::move(h)), work(make_work_guard(handler)), lock(lock)
     {}
   };
   std::optional<handler_state> state;
@@ -134,7 +178,25 @@ class yield_waiter<void> {
           if (slot.is_connected()) {
             slot.template emplace<op_cancellation>(this);
           }
-          state.emplace(std::move(h));
+          constexpr std::unique_lock<std::mutex>* lock = nullptr;
+          state.emplace(std::move(h), lock);
+        }, token);
+  }
+
+  /// Suspends the given yield_context until the captured handler is invoked
+  /// via complete() or cancel(). The given lock is released immediately before
+  /// it suspends and reacquired immediately after it resumes.
+  template <typename CompletionToken>
+  auto async_wait(std::unique_lock<std::mutex>& lock, CompletionToken&& token)
+  {
+    return boost::asio::async_initiate<CompletionToken, Signature>(
+        [this, &lock] (handler_type h) {
+          auto slot = get_associated_cancellation_slot(h);
+          if (slot.is_connected()) {
+            slot.template emplace<op_cancellation>(this);
+          }
+          state.emplace(std::move(h), &lock);
+          lock.unlock(); // unlock before suspend
         }, token);
   }
 
@@ -143,7 +205,9 @@ class yield_waiter<void> {
   {
     auto s = std::move(*state);
     state.reset();
-    boost::asio::dispatch(boost::asio::append(std::move(s.handler), ec));
+    boost::asio::dispatch(
+        boost::asio::append(
+            detail::lock_handler{std::move(s.handler), s.lock}, ec));
   }
 
   /// Destroy the completion handler.
@@ -161,10 +225,10 @@ class yield_waiter<void> {
   struct handler_state {
     handler_type handler;
     work_guard work;
+    std::unique_lock<std::mutex>* lock = nullptr;
 
-    explicit handler_state(handler_type&& h)
-      : handler(std::move(h)),
-        work(make_work_guard(handler))
+    handler_state(handler_type&& h, std::unique_lock<std::mutex>* lock)
+      : handler(std::move(h)), work(make_work_guard(handler)), lock(lock)
     {}
   };
   std::optional<handler_state> state;
@@ -189,3 +253,23 @@ class yield_waiter<void> {
 };
 
 } // namespace ceph::async
+
+namespace boost::asio {
+
+// forward the handler's associated executor, allocator, cancellation slot, etc
+template <template <typename, typename> class Associator,
+          typename Handler, typename BasicLockable, typename DefaultCandidate>
+struct associator<Associator,
+    ceph::async::detail::lock_handler<Handler, BasicLockable>, DefaultCandidate>
+  : Associator<Handler, DefaultCandidate>
+{
+  static auto get(const ceph::async::detail::lock_handler<Handler, BasicLockable>& h) noexcept {
+    return Associator<Handler, DefaultCandidate>::get(h.handler);
+  }
+  static auto get(const ceph::async::detail::lock_handler<Handler, BasicLockable>& h,
+                  const DefaultCandidate& c) noexcept {
+    return Associator<Handler, DefaultCandidate>::get(h.handler, c);
+  }
+};
+
+} // namespace boost::asio
index 69247e2b6641682735124723b7366daff4c7a976..3ffa2b478d46efdac797595bf3467e09bf8ffbaa 100644 (file)
@@ -101,6 +101,71 @@ TEST(YieldWaiterVoid, wait_error)
   }
 }
 
+TEST(YieldWaiterVoid, wait_locked_complete)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter;
+  std::optional<std::exception_ptr> eptr;
+
+  std::mutex mutex;
+  auto lock = std::unique_lock{mutex};
+
+  asio::spawn(ctx, [&waiter, &lock] (asio::yield_context yield) {
+        ASSERT_TRUE(lock.owns_lock());
+        waiter.async_wait(lock, yield);
+        EXPECT_TRUE(lock.owns_lock());
+      }, capture(eptr));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  ASSERT_FALSE(lock.owns_lock()); // unlocked while suspended
+  ASSERT_TRUE(waiter);
+  waiter.complete(error_code{});
+  EXPECT_FALSE(waiter);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(lock.owns_lock());
+}
+
+TEST(YieldWaiterVoid, wait_locked_error)
+{
+  asio::io_context ctx;
+  yield_waiter<void> waiter;
+  std::optional<std::exception_ptr> eptr;
+
+  std::mutex mutex;
+  auto lock = std::unique_lock{mutex};
+
+  asio::spawn(ctx, [&waiter, &lock] (asio::yield_context yield) {
+        ASSERT_TRUE(lock.owns_lock());
+        waiter.async_wait(lock, yield);
+        EXPECT_TRUE(lock.owns_lock());
+      }, capture(eptr));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+
+  ASSERT_FALSE(lock.owns_lock()); // unlocked while suspended
+  ASSERT_TRUE(waiter);
+  waiter.complete(make_error_code(asio::error::operation_aborted));
+  EXPECT_FALSE(waiter);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(lock.owns_lock());
+  ASSERT_TRUE(eptr);
+  ASSERT_TRUE(*eptr);
+  try {
+    std::rethrow_exception(*eptr);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
 
 TEST(YieldWaiterInt, wait_shutdown)
 {
@@ -241,6 +306,76 @@ TEST(YieldWaiterPtr, wait_error)
   }
 }
 
+TEST(YieldWaiterPtr, wait_locked_complete)
+{
+  asio::io_context ctx;
+  yield_waiter<std::unique_ptr<int>> waiter;
+  std::optional<std::unique_ptr<int>> result;
+
+  std::mutex mutex;
+  auto lock = std::unique_lock{mutex};
+
+  asio::spawn(ctx, [&waiter, &lock, &result] (asio::yield_context yield) {
+        ASSERT_TRUE(lock.owns_lock());
+        result = waiter.async_wait(lock, yield);
+        EXPECT_TRUE(lock.owns_lock());
+      }, rethrow);
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_FALSE(lock.owns_lock()); // unlocked while suspended
+
+  ASSERT_TRUE(waiter);
+  waiter.complete(error_code{}, std::make_unique<int>(42));
+  EXPECT_FALSE(waiter);
+
+  ctx.poll();
+  EXPECT_TRUE(ctx.stopped());
+  ASSERT_TRUE(lock.owns_lock());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_EQ(42, **result);
+}
+
+TEST(YieldWaiterPtr, wait_locked_error)
+{
+  asio::io_context ctx;
+  yield_waiter<std::unique_ptr<int>> waiter;
+  std::optional<std::unique_ptr<int>> result;
+  std::optional<std::exception_ptr> eptr;
+
+  std::mutex mutex;
+  auto lock = std::unique_lock{mutex};
+
+  asio::spawn(ctx, [&waiter, &lock, &result] (asio::yield_context yield) {
+        ASSERT_TRUE(lock.owns_lock());
+        result = waiter.async_wait(lock, yield);
+        EXPECT_TRUE(lock.owns_lock());
+      }, capture(eptr));
+
+  ctx.poll();
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(lock.owns_lock()); // unlocked while suspended
+
+  ASSERT_TRUE(waiter);
+  waiter.complete(make_error_code(std::errc::no_such_file_or_directory), nullptr);
+  EXPECT_FALSE(waiter);
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(lock.owns_lock());
+  EXPECT_FALSE(result);
+  ASSERT_TRUE(eptr);
+  ASSERT_TRUE(*eptr);
+  try {
+    std::rethrow_exception(*eptr);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), std::errc::no_such_file_or_directory);
+  } catch (const std::exception&) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
 void invoke_callback(int expected_reply, std::function<void(int)> cb) {
   auto t = std::thread([cb, expected_reply] {
       cb(expected_reply);