--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <concepts>
+#include <exception>
+#include <mutex>
+#include <variant>
+
+#include "include/function2.hpp"
+
+#include "yield_context.h"
+#include "detail/call_once.h"
+
+namespace ceph::async {
+
+/// Stores the result of call_once(), if any.
+///
+/// The cached Result type must be semiregular (copyable and default-
+/// constructible).
+template <std::semiregular Result>
+class once_result {
+ public:
+ /// Call f() once and cache its return value.
+ ///
+ /// If a call has already completed, return the cached result. If a call
+ /// is already in progress, wait for its completion and return the result.
+ /// If a yield context is provided in y, its coroutine will be suspended
+ /// while waiting. f() may also suspend/resume the same coroutine.
+ ///
+ /// f() must return a Result when called with no arguments. If it throws,
+ /// that exception is rethrown by all calls.
+ ///
+ /// This function is thread-safe.
+ template <typename Callable>
+ friend Result call_once(once_result& self, optional_yield y, Callable&& f)
+ requires std::convertible_to<std::invoke_result_t<Callable>, Result>
+ {
+ auto lock = std::unique_lock{self.mutex};
+ return std::visit(
+ fu2::overload(
+ [&] (const Init&) { return self.do_init(lock, f); },
+ [&] (Wait& w) { return self.do_wait(lock, y, w); },
+ [] (const Complete& c) { return do_complete(c); }),
+ self.state);
+ }
+
+ private:
+ std::mutex mutex;
+
+ // state machine: [Init] -> [Wait] -> [Complete]
+ using Init = std::monostate;
+ using Wait = detail::call_once::WaitState;
+ struct Complete {
+ Result result;
+ std::exception_ptr eptr;
+ };
+ std::variant<Init, Wait, Complete> state;
+
+ Result do_init(std::unique_lock<std::mutex>& lock, auto&& f)
+ {
+ // transition to Wait state
+ auto& wait = state.template emplace<Wait>();
+ lock.unlock();
+
+ Complete complete;
+ try {
+ complete.result = f();
+ } catch (const std::exception&) {
+ complete.eptr = std::current_exception();
+ }
+
+ lock.lock();
+ // wake any waiters
+ wait.wake_all();
+
+ // transition to Complete state and return the result
+ return do_complete(state.template emplace<Complete>(std::move(complete)));
+ }
+
+ Result do_wait(std::unique_lock<std::mutex>& lock,
+ optional_yield y, Wait& wait)
+ {
+ // wait for the response to an outstanding request
+ wait.wait(lock, y);
+
+ // state must be Complete after wakeup
+ return do_complete(std::get<Complete>(state));
+ }
+
+ static Result do_complete(const Complete& complete)
+ {
+ if (complete.eptr) { // rethrow cached exception
+ std::rethrow_exception(complete.eptr);
+ }
+ return complete.result; // return cached result
+ }
+};
+
+} // namespace ceph::async
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+
+#include <boost/intrusive/list.hpp>
+
+#include "common/async/service.h"
+#include "common/async/yield_context.h"
+#include "common/async/yield_waiter.h"
+
+namespace ceph::async::detail::call_once {
+
+class AsyncWaiter : public boost::intrusive::list_base_hook<> {
+ yield_waiter<void> impl;
+ public:
+ void wait(std::unique_lock<std::mutex>& lock,
+ boost::asio::yield_context yield)
+ {
+ impl.async_wait(lock, yield);
+ }
+ void wake()
+ {
+ // async_wait() may complete inline via boost::asio::dispatch(), but would
+ // immediately try to reacquire the lock we're already holding. defer the
+ // completion until after we've returned and dropped our lock
+ boost::asio::defer(impl.get_executor(), [this] {
+ impl.complete(boost::system::error_code{});
+ });
+ }
+ void destroy()
+ {
+ impl.shutdown();
+ }
+};
+
+class SyncWaiter : public boost::intrusive::list_base_hook<> {
+ std::condition_variable cond;
+ bool done = false;
+ public:
+ void wait(std::unique_lock<std::mutex>& lock)
+ {
+ cond.wait(lock, [this] { return done; });
+ }
+ void wake()
+ {
+ done = true;
+ cond.notify_one();
+ }
+};
+
+struct WaitState : service_list_base_hook {
+ service<WaitState>* svc = nullptr;
+ boost::intrusive::list<SyncWaiter> sync_waiters;
+ boost::intrusive::list<AsyncWaiter> async_waiters;
+
+ ~WaitState()
+ {
+ if (svc) {
+ svc->remove(*this);
+ }
+ }
+
+ void wait(std::unique_lock<std::mutex>& lock, optional_yield y)
+ {
+ if (y) {
+ auto yield = y.get_yield_context();
+ if (!svc) {
+ // on first async wait, register for service_shutdown() notifications
+ svc = &boost::asio::use_service<service<WaitState>>(
+ boost::asio::query(yield.get_executor(),
+ boost::asio::execution::context));
+ svc->add(*this);
+ }
+
+ AsyncWaiter waiter;
+ async_waiters.push_back(waiter);
+
+ waiter.wait(lock, yield);
+ } else {
+ SyncWaiter waiter;
+ sync_waiters.push_back(waiter);
+
+ waiter.wait(lock);
+ }
+ }
+
+ void wake_all()
+ {
+ while (!sync_waiters.empty()) {
+ auto& waiter = sync_waiters.front();
+ sync_waiters.pop_front();
+ waiter.wake();
+ }
+ while (!async_waiters.empty()) {
+ auto& waiter = async_waiters.front();
+ async_waiters.pop_front();
+ waiter.wake();
+ }
+ }
+
+ // on shutdown, destroy async waiters that didn't complete. this breaks any
+ // ownership cycles between once_result and the completion handlers
+ void service_shutdown()
+ {
+ while (!async_waiters.empty()) {
+ auto& waiter = async_waiters.front();
+ async_waiters.pop_front();
+ waiter.destroy();
+ }
+ }
+};
+
+} // namespace ceph::async::detail::call_once
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/async/call_once.h"
+
+#include <exception>
+#include <future>
+#include <latch>
+#include <optional>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
+#include <gtest/gtest.h>
+#include "common/async/yield_waiter.h"
+
+void rethrow(std::exception_ptr eptr)
+{
+ if (eptr) std::rethrow_exception(eptr);
+}
+
+auto capture(std::optional<std::exception_ptr>& out) {
+ return [&out] (std::exception_ptr eptr) {
+ out = std::move(eptr);
+ };
+}
+
+template <typename T>
+auto capture(std::optional<T>& out) {
+ return [&out] (std::exception_ptr eptr, T value) {
+ rethrow(eptr);
+ out = std::move(value);
+ };
+}
+
+namespace ceph::async {
+
+TEST(CallOnceAsync, call_wait_shutdown)
+{
+ boost::asio::io_context context;
+ // test that memory doesn't leak when both coroutines hold a shared_ptr to
+ // the once_result they're waiting on
+ auto once = std::make_shared<once_result<int>>();
+ yield_waiter<int> waiter;
+
+ boost::asio::spawn(context, [once, &waiter] (boost::asio::yield_context yield) {
+ call_once(*once, yield, [&] { return waiter.async_wait(yield); });
+ }, rethrow);
+ boost::asio::spawn(context, [once] (boost::asio::yield_context yield) {
+ call_once(*once, yield, [] { return 0; });
+ }, rethrow);
+
+ context.poll();
+ EXPECT_FALSE(context.stopped()); // blocked on waiter, never completes
+}
+
+TEST(CallOnceAsync, call_immediate_result)
+{
+ boost::asio::io_context context;
+ once_result<int> once;
+
+ std::optional<int> result;
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+ return call_once(once, yield, [] { return 0; });
+ }, capture(result));
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+ ASSERT_TRUE(result);
+ EXPECT_EQ(*result, 0);
+}
+
+TEST(CallOnceAsync, call_immediate_exception)
+{
+ boost::asio::io_context context;
+ once_result<int> once;
+
+ std::optional<std::exception_ptr> eptr;
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+ call_once(once, yield, [] { throw std::bad_alloc(); return 0; });
+ }, capture(eptr));
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+ ASSERT_TRUE(eptr);
+ EXPECT_THROW(std::rethrow_exception(*eptr), std::bad_alloc);
+}
+
+TEST(CallOnceAsync, call_wait_result)
+{
+ boost::asio::io_context context;
+ once_result<int> once;
+ yield_waiter<int> waiter;
+
+ std::optional<int> call_result;
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+ return call_once(once, yield, [&] { return waiter.async_wait(yield); });
+ }, capture(call_result));
+
+ std::optional<int> wait_result;
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+ return call_once(once, yield, [] { return 0; });
+ }, capture(wait_result));
+
+ context.poll();
+ EXPECT_FALSE(context.stopped());
+ ASSERT_FALSE(call_result);
+ ASSERT_FALSE(wait_result);
+
+ waiter.complete(boost::system::error_code{}, 42);
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+ ASSERT_TRUE(call_result);
+ EXPECT_EQ(*call_result, 42);
+ ASSERT_TRUE(wait_result);
+ EXPECT_EQ(*wait_result, 42);
+
+ std::optional<int> cached_result;
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+ return call_once(once, yield, [] { return 0; });
+ }, capture(cached_result));
+
+ context.restart();
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+ ASSERT_TRUE(cached_result);
+ EXPECT_EQ(*cached_result, 42);
+}
+
+TEST(CallOnceAsync, call_wait_exception)
+{
+ boost::asio::io_context context;
+ once_result<int> once;
+ yield_waiter<void> waiter;
+
+ std::optional<std::exception_ptr> call_eptr;
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+ call_once(once, yield, [&] {
+ waiter.async_wait(yield);
+ throw std::bad_alloc();
+ return 0;
+ });
+ }, capture(call_eptr));
+
+ std::optional<std::exception_ptr> wait_eptr;
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+ call_once(once, yield, [] { return 0; });
+ }, capture(wait_eptr));
+
+ context.poll();
+ EXPECT_FALSE(context.stopped());
+ ASSERT_FALSE(call_eptr);
+ ASSERT_FALSE(wait_eptr);
+
+ waiter.complete(boost::system::error_code{});
+
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+ ASSERT_TRUE(call_eptr);
+ EXPECT_THROW(std::rethrow_exception(*call_eptr), std::bad_alloc);
+ ASSERT_TRUE(wait_eptr);
+ EXPECT_THROW(std::rethrow_exception(*wait_eptr), std::bad_alloc);
+
+ std::optional<std::exception_ptr> cached_eptr;
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+ call_once(once, yield, [] { return 0; });
+ }, capture(cached_eptr));
+
+ context.restart();
+ context.poll();
+ EXPECT_TRUE(context.stopped());
+ ASSERT_TRUE(cached_eptr);
+ EXPECT_THROW(std::rethrow_exception(*cached_eptr), std::bad_alloc);
+}
+
+TEST(CallOnceSync, call_immediate_result)
+{
+ once_result<int> once;
+
+ EXPECT_EQ(call_once(once, null_yield, [] { return 42; }), 42);
+}
+
+TEST(CallOnceSync, call_immediate_result_lvalue)
+{
+ once_result<int> once;
+
+ // test that call_once() properly forwards the Callable so
+ // it can be passed by lvalue-ref without copy/move
+ struct noncopyable_fn {
+ public:
+ noncopyable_fn() = default;
+ noncopyable_fn(const noncopyable_fn&) = delete;
+ noncopyable_fn& operator=(const noncopyable_fn&) = delete;
+
+ int operator()() { return 42; }
+ };
+ noncopyable_fn fn;
+
+ EXPECT_EQ(call_once(once, null_yield, fn), 42);
+}
+
+TEST(CallOnceSync, call_immediate_exception)
+{
+ once_result<int> once;
+
+ EXPECT_THROW(call_once(once, null_yield, [] { throw std::bad_alloc(); return 0; }), std::bad_alloc);
+}
+
+TEST(CallOnceSync, call_wait_result)
+{
+ once_result<int> once;
+
+ std::future<int> call_future;
+ std::future<int> wait_future;
+ std::latch call_latch{1};
+ std::latch wait_latch{2};
+
+ call_future = std::async(std::launch::async, [&] {
+ return call_once(once, null_yield, [&] {
+ wait_future = std::async(std::launch::async, [&] {
+ wait_latch.count_down();
+ return call_once(once, null_yield, [] { return 0; });
+ });
+ wait_latch.count_down();
+ call_latch.wait();
+ return 42;
+ });
+ });
+
+ wait_latch.wait();
+
+ using namespace std::chrono_literals;
+ EXPECT_NE(wait_future.wait_for(0s), std::future_status::ready);
+
+ call_latch.count_down(); // let call return
+
+ EXPECT_EQ(call_future.get(), 42);
+ EXPECT_EQ(wait_future.get(), 42);
+
+ // return cached result
+ EXPECT_EQ(call_once(once, null_yield, [] { return 0; }), 42);
+}
+
+TEST(CallOnceSync, call_wait_exception)
+{
+ once_result<int> once;
+
+ std::future<int> call_future;
+ std::future<int> wait_future;
+ std::latch call_latch{1};
+ std::latch wait_latch{2};
+
+ call_future = std::async(std::launch::async, [&] {
+ return call_once(once, null_yield, [&] {
+ wait_future = std::async(std::launch::async, [&] {
+ wait_latch.count_down();
+ return call_once(once, null_yield, [] { return 0; });
+ });
+ wait_latch.count_down();
+ call_latch.wait();
+ throw std::bad_alloc();
+ return 0;
+ });
+ });
+
+ wait_latch.wait();
+
+ using namespace std::chrono_literals;
+ EXPECT_NE(wait_future.wait_for(0s), std::future_status::ready);
+
+ call_latch.count_down(); // let call return
+
+ EXPECT_THROW(call_future.get(), std::bad_alloc);
+ EXPECT_THROW(wait_future.get(), std::bad_alloc);
+
+ // rethrow cached exception
+ EXPECT_THROW(call_once(once, null_yield, [] { return 0; }), std::bad_alloc);
+}
+
+} // namespace ceph::async