--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <seastar/core/coroutine.hh>
+
+#include "crimson/common/errorator.h"
+#include "crimson/common/interruptible_future.h"
+
+
+namespace crimson {
+namespace internal {
+
+template <typename Interruptor, typename Errorator>
+struct to_future {
+ template <typename T>
+ using future = crimson::interruptible::interruptible_future_detail<
+ typename Interruptor::condition,
+ typename Errorator::template future<T>>;
+};
+
+template <typename Errorator>
+struct to_future<void, Errorator> {
+ template <typename T>
+ using future = typename Errorator::template future<T>;
+};
+
+
+template <typename Interruptor>
+struct to_future<Interruptor, void> {
+ template <typename T>
+ using future = ::crimson::interruptible::interruptible_future<
+ typename Interruptor::condition, T>;
+};
+
+template <>
+struct to_future<void, void> {
+ template <typename T>
+ using future = seastar::future<T>;
+};
+
+
+template <typename Future>
+struct cond_checker {
+ using ref = std::unique_ptr<cond_checker>;
+ virtual std::optional<Future> may_interrupt() = 0;
+ virtual ~cond_checker() = default;
+};
+
+template <typename Interruptor>
+struct interrupt_cond_capture {
+ using InterruptCond = typename Interruptor::condition;
+ interruptible::InterruptCondRef<InterruptCond> cond;
+
+ template <typename Future>
+ struct type_erased_cond_checker final : cond_checker<Future> {
+ interruptible::InterruptCondRef<InterruptCond> cond;
+
+ template <typename T>
+ type_erased_cond_checker(T &&t) : cond(std::forward<T>(t)) {}
+
+ std::optional<Future> may_interrupt() final {
+ return cond->template may_interrupt<Future>();
+ }
+ };
+
+ template <typename Future>
+ typename cond_checker<Future>::ref capture_and_get_checker() {
+ ceph_assert(interruptible::interrupt_cond<InterruptCond>.interrupt_cond);
+ cond = interruptible::interrupt_cond<InterruptCond>.interrupt_cond;
+ return typename cond_checker<Future>::ref{
+ new type_erased_cond_checker<Future>{cond}
+ };
+ }
+
+ void restore() {
+ ceph_assert(cond);
+ interruptible::interrupt_cond<InterruptCond>.set(cond);
+ }
+
+ void reset() {
+ interruptible::interrupt_cond<InterruptCond>.reset();
+ }
+};
+
+template <>
+struct interrupt_cond_capture<void> {
+ template <typename Future>
+ typename cond_checker<Future>::ref capture_and_get_checker() {
+ return nullptr;
+ }
+};
+
+template <typename Interruptor>
+struct seastar_task_ancestor : protected seastar::task {};
+
+template <>
+struct seastar_task_ancestor<void> : public seastar::task {};
+
+template <typename Interruptor, typename Errorator, typename T>
+class promise_base : public seastar_task_ancestor<Interruptor> {
+protected:
+ seastar::promise<T> _promise;
+
+public:
+ interrupt_cond_capture<Interruptor> cond;
+
+ using errorator_type = Errorator;
+ using interruptor = Interruptor;
+ static constexpr bool is_errorated = !std::is_void<Errorator>::value;
+ static constexpr bool is_interruptible = !std::is_void<Interruptor>::value;
+
+ using _to_future = to_future<Interruptor, Errorator>;
+
+ template <typename U=void>
+ using future = typename _to_future::template future<U>;
+
+ promise_base() = default;
+ promise_base(promise_base&&) = delete;
+ promise_base(const promise_base&) = delete;
+
+ void set_exception(std::exception_ptr&& eptr) noexcept {
+ _promise.set_exception(std::move(eptr));
+ }
+
+ void unhandled_exception() noexcept {
+ _promise.set_exception(std::current_exception());
+ }
+
+ future<T> get_return_object() noexcept {
+ return _promise.get_future();
+ }
+
+ std::suspend_never initial_suspend() noexcept { return { }; }
+ std::suspend_never final_suspend() noexcept { return { }; }
+
+ void run_and_dispose() noexcept final {
+ if constexpr (is_interruptible) {
+ cond.restore();
+ }
+ auto handle = std::coroutine_handle<promise_base>::from_promise(*this);
+ handle.resume();
+ if constexpr (is_interruptible) {
+ cond.reset();
+ }
+ }
+
+ seastar::task *waiting_task() noexcept override {
+ return _promise.waiting_task();
+ }
+ seastar::task *get_seastar_task() { return this; }
+};
+
+template <typename Interruptor, typename Errorator, typename T=void>
+class coroutine_traits {
+public:
+ class promise_type final : public promise_base<Interruptor, Errorator, T> {
+ using base = promise_base<Interruptor, Errorator, T>;
+ public:
+ template <typename... U>
+ void return_value(U&&... value) {
+ base::_promise.set_value(std::forward<U>(value)...);
+ }
+ };
+};
+
+
+template <typename Interruptor, typename Errorator>
+class coroutine_traits<Interruptor, Errorator> {
+public:
+ class promise_type final : public promise_base<Interruptor, Errorator, void> {
+ using base = promise_base<Interruptor, Errorator, void>;
+ public:
+ void return_void() noexcept {
+ base::_promise.set_value();
+ }
+ };
+};
+
+template <typename Interruptor, typename Errorator,
+ bool CheckPreempt, typename T=void>
+struct awaiter {
+ static constexpr bool is_errorated = !std::is_void<Errorator>::value;
+ static constexpr bool is_interruptible = !std::is_void<Interruptor>::value;
+
+ template <typename U=void>
+ using future = typename to_future<Interruptor, Errorator>::template future<U>;
+
+ future<T> _future;
+
+ typename cond_checker<future<T>>::ref checker;
+public:
+ explicit awaiter(future<T>&& f) noexcept : _future(std::move(f)) { }
+
+ awaiter(const awaiter&) = delete;
+ awaiter(awaiter&&) = delete;
+
+ bool await_ready() const noexcept {
+ return _future.available() && (!CheckPreempt || !seastar::need_preempt());
+ }
+
+ template <typename U>
+ void await_suspend(std::coroutine_handle<U> hndl) noexcept {
+ if constexpr (is_errorated) {
+ using dest_errorator_t = typename U::errorator_type;
+ static_assert(dest_errorator_t::template contains_once_v<Errorator>,
+ "conversion is possible to more-or-eq errorated future!");
+ }
+
+ checker =
+ hndl.promise().cond.template capture_and_get_checker<future<T>>();
+ if (!CheckPreempt || !_future.available()) {
+ _future.set_coroutine(*hndl.promise().get_seastar_task());
+ } else {
+ ::seastar::schedule(hndl.promise().get_seastar_task());
+ }
+ }
+
+ T await_resume() {
+ if (auto maybe_fut = checker ? checker->may_interrupt() : std::nullopt) {
+ // silence warning that we are discarding an exceptional future
+ if (_future.failed()) _future.get_exception();
+ if constexpr (is_errorated) {
+ return maybe_fut->unsafe_get0();
+ } else {
+ return maybe_fut->get0();
+ }
+ } else {
+ if constexpr (is_errorated) {
+ return _future.unsafe_get0();
+ } else {
+ return _future.get0();
+ }
+ }
+ }
+};
+
+}
+}
+
+template <template <typename> typename Container, typename T>
+auto operator co_await(
+ Container<crimson::errorated_future_marker<T>> f) noexcept {
+ using Errorator = typename seastar::futurize<decltype(f)>::errorator_type;
+ return crimson::internal::awaiter<void, Errorator, true, T>(std::move(f));
+}
+
+template <typename InterruptCond, typename T>
+auto operator co_await(
+ crimson::interruptible::interruptible_future_detail<
+ InterruptCond, seastar::future<T>
+ > f) noexcept {
+ return crimson::internal::awaiter<
+ crimson::interruptible::interruptor<InterruptCond>, void, true, T>(
+ std::move(f));
+}
+
+template <template <typename> typename Container,
+ typename InterruptCond, typename T>
+auto operator co_await(
+ crimson::interruptible::interruptible_future_detail<
+ InterruptCond, Container<crimson::errorated_future_marker<T>>
+ > f) noexcept {
+ using Errorator = typename seastar::futurize<decltype(f)>::errorator_type;
+ return crimson::internal::awaiter<
+ crimson::interruptible::interruptor<InterruptCond>,
+ typename Errorator::base_ertr, true, T>(
+ std::move(f));
+}
+
+namespace std {
+
+template <template <typename> typename Container,
+ typename T, typename... Args>
+class coroutine_traits<Container<crimson::errorated_future_marker<T>>, Args...> :
+ public crimson::internal::coroutine_traits<
+ void,
+ typename seastar::futurize<
+ Container<crimson::errorated_future_marker<T>>
+ >::errorator_type,
+ T> {};
+
+template <typename InterruptCond,
+ typename T, typename... Args>
+class coroutine_traits<
+ crimson::interruptible::interruptible_future_detail<
+ InterruptCond, seastar::future<T>
+ >, Args...> : public crimson::internal::coroutine_traits<
+ crimson::interruptible::interruptor<InterruptCond>,
+ void,
+ T> {};
+
+template <template <typename> typename Container,
+ typename InterruptCond,
+ typename T, typename... Args>
+class coroutine_traits<
+ crimson::interruptible::interruptible_future_detail<
+ InterruptCond, Container<crimson::errorated_future_marker<T>>
+ >, Args...> :
+ public crimson::internal::coroutine_traits<
+ crimson::interruptible::interruptor<InterruptCond>,
+ typename seastar::futurize<
+ crimson::interruptible::interruptible_future_detail<
+ InterruptCond,
+ Container<crimson::errorated_future_marker<T>>
+ >
+ >::errorator_type::base_ertr,
+ T> {};
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/iterator/counting_iterator.hpp>
+#include <numeric>
+
+#include "seastar/core/sleep.hh"
+
+#include "crimson/common/coroutine.h"
+#include "crimson/common/errorator.h"
+#include "crimson/common/interruptible_future.h"
+#include "crimson/common/log.h"
+
+#include "test/crimson/gtest_seastar.h"
+
+struct coroutine_test_t : public seastar_test_suite_t {
+ struct interruption_state_t {
+ bool interrupted = false;
+ } interruption_state;
+
+ class test_interruption : public std::exception
+ {};
+
+ class test_interrupt_cond {
+ interruption_state_t *int_state = nullptr;
+ public:
+ test_interrupt_cond() = delete;
+ test_interrupt_cond(interruption_state_t *int_state)
+ : int_state(int_state) {}
+
+ template <typename T>
+ std::optional<T> may_interrupt() {
+ ceph_assert(int_state);
+ if (int_state->interrupted) {
+ return seastar::futurize<T>::make_exception_future(
+ test_interruption()
+ );
+ } else {
+ return std::nullopt;
+ }
+ }
+
+ template <typename T>
+ static constexpr bool is_interruption_v = std::is_same_v<
+ T, test_interruption>;
+
+ static bool is_interruption(std::exception_ptr& eptr) {
+ if (*eptr.__cxa_exception_type() == typeid(test_interruption))
+ return true;
+ return false;
+ }
+ };
+ using interruptor = crimson::interruptible::interruptor<test_interrupt_cond>;
+
+ using ertr = crimson::errorator<crimson::ct_error::invarg>;
+ using iertr = crimson::interruptible::interruptible_errorator<
+ test_interrupt_cond,
+ ertr>;
+
+ using ertr2 = ertr::extend<
+ crimson::ct_error::eagain>;
+ using iertr2 = crimson::interruptible::interruptible_errorator<
+ test_interrupt_cond,
+ ertr2>;
+
+ using ertr3 = ertr::extend<
+ crimson::ct_error::enoent>;
+ using iertr3 = crimson::interruptible::interruptible_errorator<
+ test_interrupt_cond,
+ ertr3>;
+
+ void interrupt() {
+ interruption_state.interrupted = true;
+ }
+
+ seastar::future<> set_up_fut() final {
+ interruption_state.interrupted = false;
+ return seastar::now();
+ }
+
+
+ template <typename E, typename F>
+ auto cwi(E &&errf, F &&f) {
+ return interruptor::with_interruption(
+ scl(std::forward<F>(f)),
+ std::forward<E>(errf),
+ &interruption_state);
+ }
+};
+
+namespace crimson::interruptible {
+template
+thread_local interrupt_cond_t<coroutine_test_t::test_interrupt_cond>
+interrupt_cond<coroutine_test_t::test_interrupt_cond>;
+}
+
+TEST_F(coroutine_test_t, test_coroutine)
+{
+ run_scl([]() -> seastar::future<> {
+ constexpr int CHECK = 20;
+ auto unwrapped = co_await seastar::make_ready_future<int>(CHECK);
+ EXPECT_EQ(unwrapped, CHECK);
+ });
+}
+
+TEST_F(coroutine_test_t, test_ertr_coroutine_basic)
+{
+ run_ertr_scl([]() -> ertr::future<> {
+ constexpr int CHECK = 20;
+ auto unwrapped = co_await ertr::make_ready_future<int>(CHECK);
+ EXPECT_EQ(unwrapped, CHECK);
+ });
+}
+
+TEST_F(coroutine_test_t, test_ertr_coroutine_vanilla_future)
+{
+ run_ertr_scl([]() -> ertr::future<> {
+ constexpr int CHECK = 20;
+ auto unwrapped = co_await seastar::make_ready_future<int>(CHECK);
+ EXPECT_EQ(unwrapped, CHECK);
+ });
+}
+
+TEST_F(coroutine_test_t, test_ertr_coroutine_error)
+{
+ run_scl([this]() -> seastar::future<> {
+ auto fut = scl([]() -> ertr::future<int> {
+ std::ignore = co_await ertr::future<int>(
+ crimson::ct_error::invarg::make()
+ );
+ EXPECT_EQ("above co_await should throw", nullptr);
+ co_return 10;
+ })();
+ auto ret = co_await std::move(fut).handle_error(
+ [](const crimson::ct_error::invarg &e) {
+ return 20;
+ }
+ );
+ EXPECT_EQ(ret, 20);
+ });
+}
+
+#if 0
+// This one is left in, but commented out, as a test which *should fail to
+// build* due to trying to co_await a more errorated future.
+TEST_F(coroutine_test_t, test_basic_ertr_coroutine_error_should_not_build)
+{
+ run_ertr_scl([]() -> ertr::future<int> {
+ constexpr int CHECK = 20;
+ auto unwrapped = co_await ertr2::make_ready_future<int>(CHECK);
+ EXPECT_EQ(unwrapped, CHECK);
+ co_return 10;
+ });
+}
+#endif
+
+TEST_F(coroutine_test_t, interruptible_coroutine_basic)
+{
+ run_scl([this]() -> seastar::future<> {
+ seastar::promise<int> p;
+ auto ret = cwi(
+ [](auto) { return 2; },
+ [f=p.get_future()]() mutable -> interruptor::future<int> {
+ auto x = co_await interruptor::make_interruptible(std::move(f));
+ co_return x;
+ });
+ p.set_value(0);
+ auto awaited = co_await std::move(ret);
+ EXPECT_EQ(awaited, 0);
+ });
+}
+
+TEST_F(coroutine_test_t, interruptible_coroutine_interrupted)
+{
+ run_scl([this]() -> seastar::future<> {
+ seastar::promise<int> p;
+ auto ret = cwi(
+ [](auto) { return 2; },
+ [f=p.get_future()]() mutable -> interruptor::future<int> {
+ auto x = co_await interruptor::make_interruptible(std::move(f));
+ co_return x;
+ });
+ interrupt();
+ p.set_value(0);
+ auto awaited = co_await std::move(ret);
+ EXPECT_EQ(awaited, 2);
+ });
+}
+
+TEST_F(coroutine_test_t, dual_interruptible_coroutine)
+{
+ run_scl([this]() -> seastar::future<> {
+ seastar::promise<int> p, p2;
+ auto fut1 = cwi(
+ [](auto) { return 2; },
+ [&p, f=p2.get_future()]() mutable -> interruptor::future<int> {
+ auto x = co_await interruptor::make_interruptible(std::move(f));
+ p.set_value(1);
+ co_return x;
+ });
+ auto fut2 = cwi(
+ [](auto) { return 2; },
+ [&p2, f=p.get_future()]() mutable -> interruptor::future<int> {
+ p2.set_value(0);
+ auto x = co_await interruptor::make_interruptible(std::move(f));
+ co_return x;
+ });
+
+ auto ret1 = co_await std::move(fut1);
+ auto ret2 = co_await std::move(fut2);
+ EXPECT_EQ(ret1, 0);
+ EXPECT_EQ(ret2, 1);
+ });
+}
+
+TEST_F(coroutine_test_t, dual_interruptible_coroutine_interrupted)
+{
+ run_scl([this]() -> seastar::future<> {
+ seastar::promise<int> p, p2;
+ auto fut1 = cwi(
+ [](auto) { return 2; },
+ [this, &p, f=p2.get_future()]() mutable -> interruptor::future<int> {
+ auto x = co_await interruptor::make_interruptible(std::move(f));
+ interrupt();
+ p.set_value(1);
+ co_return x;
+ });
+ auto fut2 = cwi(
+ [](auto) { return 2; },
+ [&p2, f=p.get_future()]() mutable -> interruptor::future<int> {
+ p2.set_value(0);
+ auto x = co_await interruptor::make_interruptible(std::move(f));
+ co_return x;
+ });
+
+ auto ret1 = co_await std::move(fut1);
+ auto ret2 = co_await std::move(fut2);
+ EXPECT_EQ(ret1, 0);
+ EXPECT_EQ(ret2, 2);
+ });
+}
+
+TEST_F(coroutine_test_t, test_iertr_coroutine_basic)
+{
+ run_ertr_scl([this]() -> ertr2::future<> {
+ auto ret = co_await cwi(
+ [](auto) { return 10; },
+ []() -> iertr::future<int> {
+ co_return 20;
+ });
+ EXPECT_EQ(ret, 20);
+ });
+}
+
+TEST_F(coroutine_test_t, test_iertr_coroutine_interruption_as_error)
+{
+ run_ertr_scl([this]() -> ertr2::future<> {
+ auto ret = co_await cwi(
+ [](auto) {
+ return ertr2::future<int>(crimson::ct_error::eagain::make());
+ },
+ []() -> iertr::future<int> {
+ co_return 20;
+ });
+ EXPECT_EQ(ret, 20);
+ });
+}
+
+TEST_F(coroutine_test_t, test_iertr_coroutine_interruption_as_error_interrupted)
+{
+ run_ertr_scl([this]() -> ertr::future<> {
+ seastar::promise<> p;
+ auto f = cwi(
+ [](auto) {
+ return ertr2::future<int>(crimson::ct_error::eagain::make());
+ },
+ [&p]() -> iertr::future<int> {
+ co_await iertr::make_interruptible(p.get_future());
+ co_return 20;
+ });
+ interrupt();
+ p.set_value();
+ auto ret = co_await f.handle_error(
+ crimson::ct_error::eagain::handle([](const auto &) {
+ return 30;
+ }),
+ crimson::ct_error::pass_further_all{}
+ );
+ EXPECT_EQ(ret, 30);
+ });
+}
+
+#if 0
+// the cwi invocation below would yield an ertr2 due to the interruption handler
+TEST_F(coroutine_test_t, test_iertr_coroutine_interruption_should_not_compile)
+{
+ run_ertr_scl([this]() -> ertr::future<> {
+ auto ret = co_await cwi(
+ [](auto) {
+ ertr2::future<int>(crimson::ct_error::eagain::make());
+ },
+ []() -> iertr::future<int> {
+ co_return 20;
+ });
+ EXPECT_EQ(ret, 20);
+ });
+}
+#endif
+
+#if 0
+// can't co_await a vanilla future from an interruptible coroutine
+TEST_F(coroutine_test_t, test_iertr_coroutine_interruption_should_not_compile2)
+{
+ run_ertr_scl([this]() -> ertr2::future<> {
+ auto ret = co_await cwi(
+ [](auto) {
+ return ertr2::future<int>(crimson::ct_error::eagain::make());
+ },
+ []() -> iertr::future<int> {
+ co_await seastar::now();
+ co_return 20;
+ });
+ EXPECT_EQ(ret, 20);
+ });
+}
+#endif
+