]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: add call_once() algorithm for optional_yield
authorCasey Bodley <cbodley@redhat.com>
Mon, 24 Mar 2025 16:51:15 +0000 (12:51 -0400)
committerMarcel Lauhoff <marcel.lauhoff@clyso.com>
Mon, 1 Jun 2026 16:43:29 +0000 (18:43 +0200)
modeled after std::call_once() to guarantee that racing callers wait for
the initial caller to finish. the main differences here are

* support for coroutine callers to suspend instead of blocking while
  waiting for the initial caller, and
* the wrapped function must return a value, which is cached and returned
  to all callers

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/call_once.h [new file with mode: 0644]
src/common/async/detail/call_once.h [new file with mode: 0644]
src/test/common/CMakeLists.txt
src/test/common/test_async_call_once.cc [new file with mode: 0644]

diff --git a/src/common/async/call_once.h b/src/common/async/call_once.h
new file mode 100644 (file)
index 0000000..8c48a2c
--- /dev/null
@@ -0,0 +1,113 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <concepts>
+#include <exception>
+#include <mutex>
+#include <variant>
+
+#include "include/function2.hpp"
+
+#include "yield_context.h"
+#include "detail/call_once.h"
+
+namespace ceph::async {
+
+/// Stores the result of call_once(), if any.
+///
+/// The cached Result type must be semiregular (copyable and default-
+/// constructible).
+template <std::semiregular Result>
+class once_result {
+ public:
+  /// Call f() once and cache its return value.
+  ///
+  /// If a call has already completed, return the cached result. If a call
+  /// is already in progress, wait for its completion and return the result.
+  /// If a yield context is provided in y, its coroutine will be suspended
+  /// while waiting. f() may also suspend/resume the same coroutine.
+  ///
+  /// f() must return a Result when called with no arguments. If it throws,
+  /// that exception is rethrown by all calls.
+  ///
+  /// This function is thread-safe.
+  template <typename Callable>
+  friend Result call_once(once_result& self, optional_yield y, Callable&& f)
+      requires std::convertible_to<std::invoke_result_t<Callable>, Result>
+  {
+    auto lock = std::unique_lock{self.mutex};
+    return std::visit(
+        fu2::overload(
+            [&] (const Init&) { return self.do_init(lock, f); },
+            [&] (Wait& w) { return self.do_wait(lock, y, w); },
+            [] (const Complete& c) { return do_complete(c); }),
+        self.state);
+  }
+
+ private:
+  std::mutex mutex;
+
+  // state machine: [Init] -> [Wait] -> [Complete]
+  using Init = std::monostate;
+  using Wait = detail::call_once::WaitState;
+  struct Complete {
+    Result result;
+    std::exception_ptr eptr;
+  };
+  std::variant<Init, Wait, Complete> state;
+
+  Result do_init(std::unique_lock<std::mutex>& lock, auto&& f)
+  {
+    // transition to Wait state
+    auto& wait = state.template emplace<Wait>();
+    lock.unlock();
+
+    Complete complete;
+    try {
+      complete.result = f();
+    } catch (const std::exception&) {
+      complete.eptr = std::current_exception();
+    }
+
+    lock.lock();
+    // wake any waiters
+    wait.wake_all();
+
+    // transition to Complete state and return the result
+    return do_complete(state.template emplace<Complete>(std::move(complete)));
+  }
+
+  Result do_wait(std::unique_lock<std::mutex>& lock,
+                 optional_yield y, Wait& wait)
+  {
+    // wait for the response to an outstanding request
+    wait.wait(lock, y);
+
+    // state must be Complete after wakeup
+    return do_complete(std::get<Complete>(state));
+  }
+
+  static Result do_complete(const Complete& complete)
+  {
+    if (complete.eptr) { // rethrow cached exception
+      std::rethrow_exception(complete.eptr);
+    }
+    return complete.result; // return cached result
+  }
+};
+
+} // namespace ceph::async
diff --git a/src/common/async/detail/call_once.h b/src/common/async/detail/call_once.h
new file mode 100644 (file)
index 0000000..c7d6764
--- /dev/null
@@ -0,0 +1,129 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+
+#include <boost/intrusive/list.hpp>
+
+#include "common/async/service.h"
+#include "common/async/yield_context.h"
+#include "common/async/yield_waiter.h"
+
+namespace ceph::async::detail::call_once {
+
+class AsyncWaiter : public boost::intrusive::list_base_hook<> {
+  yield_waiter<void> impl;
+ public:
+  void wait(std::unique_lock<std::mutex>& lock,
+            boost::asio::yield_context yield)
+  {
+    impl.async_wait(lock, yield);
+  }
+  void wake()
+  {
+    // async_wait() may complete inline via boost::asio::dispatch(), but would
+    // immediately try to reacquire the lock we're already holding. defer the
+    // completion until after we've returned and dropped our lock
+    boost::asio::defer(impl.get_executor(), [this] {
+        impl.complete(boost::system::error_code{});
+      });
+  }
+  void destroy()
+  {
+    impl.shutdown();
+  }
+};
+
+class SyncWaiter : public boost::intrusive::list_base_hook<> {
+  std::condition_variable cond;
+  bool done = false;
+ public:
+  void wait(std::unique_lock<std::mutex>& lock)
+  {
+    cond.wait(lock, [this] { return done; });
+  }
+  void wake()
+  {
+    done = true;
+    cond.notify_one();
+  }
+};
+
+struct WaitState : service_list_base_hook {
+  service<WaitState>* svc = nullptr;
+  boost::intrusive::list<SyncWaiter> sync_waiters;
+  boost::intrusive::list<AsyncWaiter> async_waiters;
+
+  ~WaitState()
+  {
+    if (svc) {
+      svc->remove(*this);
+    }
+  }
+
+  void wait(std::unique_lock<std::mutex>& lock, optional_yield y)
+  {
+    if (y) {
+      auto yield = y.get_yield_context();
+      if (!svc) {
+        // on first async wait, register for service_shutdown() notifications
+        svc = &boost::asio::use_service<service<WaitState>>(
+            boost::asio::query(yield.get_executor(),
+                               boost::asio::execution::context));
+        svc->add(*this);
+      }
+
+      AsyncWaiter waiter;
+      async_waiters.push_back(waiter);
+
+      waiter.wait(lock, yield);
+    } else {
+      SyncWaiter waiter;
+      sync_waiters.push_back(waiter);
+
+      waiter.wait(lock);
+    }
+  }
+
+  void wake_all()
+  {
+    while (!sync_waiters.empty()) {
+      auto& waiter = sync_waiters.front();
+      sync_waiters.pop_front();
+      waiter.wake();
+    }
+    while (!async_waiters.empty()) {
+      auto& waiter = async_waiters.front();
+      async_waiters.pop_front();
+      waiter.wake();
+    }
+  }
+
+  // on shutdown, destroy async waiters that didn't complete. this breaks any
+  // ownership cycles between once_result and the completion handlers
+  void service_shutdown()
+  {
+    while (!async_waiters.empty()) {
+      auto& waiter = async_waiters.front();
+      async_waiters.pop_front();
+      waiter.destroy();
+    }
+  }
+};
+
+} // namespace ceph::async::detail::call_once
index c2190d7e9813edf5ef39864d9a789acda4372669..cbbce114355090163644c0d1cafe4c9ded00ed36 100644 (file)
@@ -377,6 +377,10 @@ add_executable(unittest_hobject test_hobject.cc
 target_link_libraries(unittest_hobject global ceph-common)
 add_ceph_unittest(unittest_hobject)
 
+add_executable(unittest_async_call_once test_async_call_once.cc)
+add_ceph_unittest(unittest_async_call_once)
+target_link_libraries(unittest_async_call_once ceph-common Boost::context)
+
 add_executable(unittest_async_completion test_async_completion.cc)
 add_ceph_unittest(unittest_async_completion)
 target_link_libraries(unittest_async_completion ceph-common Boost::headers)
diff --git a/src/test/common/test_async_call_once.cc b/src/test/common/test_async_call_once.cc
new file mode 100644 (file)
index 0000000..c0fc76c
--- /dev/null
@@ -0,0 +1,291 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/call_once.h"
+
+#include <exception>
+#include <future>
+#include <latch>
+#include <optional>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
+#include <gtest/gtest.h>
+#include "common/async/yield_waiter.h"
+
+void rethrow(std::exception_ptr eptr)
+{
+  if (eptr) std::rethrow_exception(eptr);
+}
+
+auto capture(std::optional<std::exception_ptr>& out) {
+  return [&out] (std::exception_ptr eptr) {
+    out = std::move(eptr);
+  };
+}
+
+template <typename T>
+auto capture(std::optional<T>& out) {
+  return [&out] (std::exception_ptr eptr, T value) {
+    rethrow(eptr);
+    out = std::move(value);
+  };
+}
+
+namespace ceph::async {
+
+TEST(CallOnceAsync, call_wait_shutdown)
+{
+  boost::asio::io_context context;
+  // test that memory doesn't leak when both coroutines hold a shared_ptr to
+  // the once_result they're waiting on
+  auto once = std::make_shared<once_result<int>>();
+  yield_waiter<int> waiter;
+
+  boost::asio::spawn(context, [once, &waiter] (boost::asio::yield_context yield) {
+        call_once(*once, yield, [&] { return waiter.async_wait(yield); });
+      }, rethrow);
+  boost::asio::spawn(context, [once] (boost::asio::yield_context yield) {
+        call_once(*once, yield, [] { return 0; });
+      }, rethrow);
+
+  context.poll();
+  EXPECT_FALSE(context.stopped()); // blocked on waiter, never completes
+}
+
+TEST(CallOnceAsync, call_immediate_result)
+{
+  boost::asio::io_context context;
+  once_result<int> once;
+
+  std::optional<int> result;
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+        return call_once(once, yield, [] { return 0; });
+      }, capture(result));
+
+  context.poll();
+  EXPECT_TRUE(context.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_EQ(*result, 0);
+}
+
+TEST(CallOnceAsync, call_immediate_exception)
+{
+  boost::asio::io_context context;
+  once_result<int> once;
+
+  std::optional<std::exception_ptr> eptr;
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+        call_once(once, yield, [] { throw std::bad_alloc(); return 0; });
+      }, capture(eptr));
+
+  context.poll();
+  EXPECT_TRUE(context.stopped());
+  ASSERT_TRUE(eptr);
+  EXPECT_THROW(std::rethrow_exception(*eptr), std::bad_alloc);
+}
+
+TEST(CallOnceAsync, call_wait_result)
+{
+  boost::asio::io_context context;
+  once_result<int> once;
+  yield_waiter<int> waiter;
+
+  std::optional<int> call_result;
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+        return call_once(once, yield, [&] { return waiter.async_wait(yield); });
+      }, capture(call_result));
+
+  std::optional<int> wait_result;
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+        return call_once(once, yield, [] { return 0; });
+      }, capture(wait_result));
+
+  context.poll();
+  EXPECT_FALSE(context.stopped());
+  ASSERT_FALSE(call_result);
+  ASSERT_FALSE(wait_result);
+
+  waiter.complete(boost::system::error_code{}, 42);
+
+  context.poll();
+  EXPECT_TRUE(context.stopped());
+  ASSERT_TRUE(call_result);
+  EXPECT_EQ(*call_result, 42);
+  ASSERT_TRUE(wait_result);
+  EXPECT_EQ(*wait_result, 42);
+
+  std::optional<int> cached_result;
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+        return call_once(once, yield, [] { return 0; });
+      }, capture(cached_result));
+
+  context.restart();
+  context.poll();
+  EXPECT_TRUE(context.stopped());
+  ASSERT_TRUE(cached_result);
+  EXPECT_EQ(*cached_result, 42);
+}
+
+TEST(CallOnceAsync, call_wait_exception)
+{
+  boost::asio::io_context context;
+  once_result<int> once;
+  yield_waiter<void> waiter;
+
+  std::optional<std::exception_ptr> call_eptr;
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+        call_once(once, yield, [&] {
+            waiter.async_wait(yield);
+            throw std::bad_alloc();
+            return 0;
+          });
+      }, capture(call_eptr));
+
+  std::optional<std::exception_ptr> wait_eptr;
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+        call_once(once, yield, [] { return 0; });
+      }, capture(wait_eptr));
+
+  context.poll();
+  EXPECT_FALSE(context.stopped());
+  ASSERT_FALSE(call_eptr);
+  ASSERT_FALSE(wait_eptr);
+
+  waiter.complete(boost::system::error_code{});
+
+  context.poll();
+  EXPECT_TRUE(context.stopped());
+  ASSERT_TRUE(call_eptr);
+  EXPECT_THROW(std::rethrow_exception(*call_eptr), std::bad_alloc);
+  ASSERT_TRUE(wait_eptr);
+  EXPECT_THROW(std::rethrow_exception(*wait_eptr), std::bad_alloc);
+
+  std::optional<std::exception_ptr> cached_eptr;
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+        call_once(once, yield, [] { return 0; });
+      }, capture(cached_eptr));
+
+  context.restart();
+  context.poll();
+  EXPECT_TRUE(context.stopped());
+  ASSERT_TRUE(cached_eptr);
+  EXPECT_THROW(std::rethrow_exception(*cached_eptr), std::bad_alloc);
+}
+
+TEST(CallOnceSync, call_immediate_result)
+{
+  once_result<int> once;
+
+  EXPECT_EQ(call_once(once, null_yield, [] { return 42; }), 42);
+}
+
+TEST(CallOnceSync, call_immediate_result_lvalue)
+{
+  once_result<int> once;
+
+  // test that call_once() properly forwards the Callable so
+  // it can be passed by lvalue-ref without copy/move
+  struct noncopyable_fn {
+   public:
+    noncopyable_fn() = default;
+    noncopyable_fn(const noncopyable_fn&) = delete;
+    noncopyable_fn& operator=(const noncopyable_fn&) = delete;
+
+    int operator()() { return 42; }
+  };
+  noncopyable_fn fn;
+
+  EXPECT_EQ(call_once(once, null_yield, fn), 42);
+}
+
+TEST(CallOnceSync, call_immediate_exception)
+{
+  once_result<int> once;
+
+  EXPECT_THROW(call_once(once, null_yield, [] { throw std::bad_alloc(); return 0; }), std::bad_alloc);
+}
+
+TEST(CallOnceSync, call_wait_result)
+{
+  once_result<int> once;
+
+  std::future<int> call_future;
+  std::future<int> wait_future;
+  std::latch call_latch{1};
+  std::latch wait_latch{2};
+
+  call_future = std::async(std::launch::async, [&] {
+      return call_once(once, null_yield, [&] {
+          wait_future = std::async(std::launch::async, [&] {
+              wait_latch.count_down();
+              return call_once(once, null_yield, [] { return 0; });
+            });
+          wait_latch.count_down();
+          call_latch.wait();
+          return 42;
+        });
+    });
+
+  wait_latch.wait();
+
+  using namespace std::chrono_literals;
+  EXPECT_NE(wait_future.wait_for(0s), std::future_status::ready);
+
+  call_latch.count_down(); // let call return
+
+  EXPECT_EQ(call_future.get(), 42);
+  EXPECT_EQ(wait_future.get(), 42);
+
+  // return cached result
+  EXPECT_EQ(call_once(once, null_yield, [] { return 0; }), 42);
+}
+
+TEST(CallOnceSync, call_wait_exception)
+{
+  once_result<int> once;
+
+  std::future<int> call_future;
+  std::future<int> wait_future;
+  std::latch call_latch{1};
+  std::latch wait_latch{2};
+
+  call_future = std::async(std::launch::async, [&] {
+      return call_once(once, null_yield, [&] {
+          wait_future = std::async(std::launch::async, [&] {
+              wait_latch.count_down();
+              return call_once(once, null_yield, [] { return 0; });
+            });
+          wait_latch.count_down();
+          call_latch.wait();
+          throw std::bad_alloc();
+          return 0;
+        });
+    });
+
+  wait_latch.wait();
+
+  using namespace std::chrono_literals;
+  EXPECT_NE(wait_future.wait_for(0s), std::future_status::ready);
+
+  call_latch.count_down(); // let call return
+
+  EXPECT_THROW(call_future.get(), std::bad_alloc);
+  EXPECT_THROW(wait_future.get(), std::bad_alloc);
+
+  // rethrow cached exception
+  EXPECT_THROW(call_once(once, null_yield, [] { return 0; }), std::bad_alloc);
+}
+
+} // namespace ceph::async