From fbd9777055171be5e51434af94e600130cb2fef4 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 31 Jan 2024 15:34:34 -0800 Subject: [PATCH] crimson/common: add coroutine integration for crimson futures Adds coroutine machinery for crimson errorated and interruptible futures. Signed-off-by: Samuel Just (cherry picked from commit 1126ec3cf6285abbf6a875ab27daff397f09220b) --- src/crimson/common/coroutine.h | 310 +++++++++++++++++++ src/test/crimson/CMakeLists.txt | 8 + src/test/crimson/gtest_seastar.h | 27 ++ src/test/crimson/test_crimson_coroutine.cc | 327 +++++++++++++++++++++ 4 files changed, 672 insertions(+) create mode 100644 src/crimson/common/coroutine.h create mode 100644 src/test/crimson/test_crimson_coroutine.cc diff --git a/src/crimson/common/coroutine.h b/src/crimson/common/coroutine.h new file mode 100644 index 0000000000000..c4dfca58c5816 --- /dev/null +++ b/src/crimson/common/coroutine.h @@ -0,0 +1,310 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include + +#include "crimson/common/errorator.h" +#include "crimson/common/interruptible_future.h" + + +namespace crimson { +namespace internal { + +template +struct to_future { + template + using future = crimson::interruptible::interruptible_future_detail< + typename Interruptor::condition, + typename Errorator::template future>; +}; + +template +struct to_future { + template + using future = typename Errorator::template future; +}; + + +template +struct to_future { + template + using future = ::crimson::interruptible::interruptible_future< + typename Interruptor::condition, T>; +}; + +template <> +struct to_future { + template + using future = seastar::future; +}; + + +template +struct cond_checker { + using ref = std::unique_ptr; + virtual std::optional may_interrupt() = 0; + virtual ~cond_checker() = default; +}; + +template +struct interrupt_cond_capture { + using InterruptCond = typename Interruptor::condition; + interruptible::InterruptCondRef cond; + + template + struct type_erased_cond_checker final : cond_checker { + interruptible::InterruptCondRef cond; + + template + type_erased_cond_checker(T &&t) : cond(std::forward(t)) {} + + std::optional may_interrupt() final { + return cond->template may_interrupt(); + } + }; + + template + typename cond_checker::ref capture_and_get_checker() { + ceph_assert(interruptible::interrupt_cond.interrupt_cond); + cond = interruptible::interrupt_cond.interrupt_cond; + return typename cond_checker::ref{ + new type_erased_cond_checker{cond} + }; + } + + void restore() { + ceph_assert(cond); + interruptible::interrupt_cond.set(cond); + } + + void reset() { + interruptible::interrupt_cond.reset(); + } +}; + +template <> +struct interrupt_cond_capture { + template + typename cond_checker::ref capture_and_get_checker() { + return nullptr; + } +}; + +template +struct seastar_task_ancestor : protected seastar::task {}; + +template <> +struct seastar_task_ancestor : public seastar::task {}; + +template +class promise_base : public seastar_task_ancestor { +protected: + seastar::promise _promise; + +public: + interrupt_cond_capture cond; + + using errorator_type = Errorator; + using interruptor = Interruptor; + static constexpr bool is_errorated = !std::is_void::value; + static constexpr bool is_interruptible = !std::is_void::value; + + using _to_future = to_future; + + template + using future = typename _to_future::template future; + + promise_base() = default; + promise_base(promise_base&&) = delete; + promise_base(const promise_base&) = delete; + + void set_exception(std::exception_ptr&& eptr) noexcept { + _promise.set_exception(std::move(eptr)); + } + + void unhandled_exception() noexcept { + _promise.set_exception(std::current_exception()); + } + + future get_return_object() noexcept { + return _promise.get_future(); + } + + std::suspend_never initial_suspend() noexcept { return { }; } + std::suspend_never final_suspend() noexcept { return { }; } + + void run_and_dispose() noexcept final { + if constexpr (is_interruptible) { + cond.restore(); + } + auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); + if constexpr (is_interruptible) { + cond.reset(); + } + } + + seastar::task *waiting_task() noexcept override { + return _promise.waiting_task(); + } + seastar::task *get_seastar_task() { return this; } +}; + +template +class coroutine_traits { +public: + class promise_type final : public promise_base { + using base = promise_base; + public: + template + void return_value(U&&... value) { + base::_promise.set_value(std::forward(value)...); + } + }; +}; + + +template +class coroutine_traits { +public: + class promise_type final : public promise_base { + using base = promise_base; + public: + void return_void() noexcept { + base::_promise.set_value(); + } + }; +}; + +template +struct awaiter { + static constexpr bool is_errorated = !std::is_void::value; + static constexpr bool is_interruptible = !std::is_void::value; + + template + using future = typename to_future::template future; + + future _future; + + typename cond_checker>::ref checker; +public: + explicit awaiter(future&& f) noexcept : _future(std::move(f)) { } + + awaiter(const awaiter&) = delete; + awaiter(awaiter&&) = delete; + + bool await_ready() const noexcept { + return _future.available() && (!CheckPreempt || !seastar::need_preempt()); + } + + template + void await_suspend(std::coroutine_handle hndl) noexcept { + if constexpr (is_errorated) { + using dest_errorator_t = typename U::errorator_type; + static_assert(dest_errorator_t::template contains_once_v, + "conversion is possible to more-or-eq errorated future!"); + } + + checker = + hndl.promise().cond.template capture_and_get_checker>(); + if (!CheckPreempt || !_future.available()) { + _future.set_coroutine(*hndl.promise().get_seastar_task()); + } else { + ::seastar::schedule(hndl.promise().get_seastar_task()); + } + } + + T await_resume() { + if (auto maybe_fut = checker ? checker->may_interrupt() : std::nullopt) { + // silence warning that we are discarding an exceptional future + if (_future.failed()) _future.get_exception(); + if constexpr (is_errorated) { + return maybe_fut->unsafe_get0(); + } else { + return maybe_fut->get0(); + } + } else { + if constexpr (is_errorated) { + return _future.unsafe_get0(); + } else { + return _future.get0(); + } + } + } +}; + +} +} + +template